Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FORCE_CPU_ONLY option #69

Merged
merged 7 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 18 additions & 29 deletions agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ def _start_net_client(docker_api=None):
]

if constants.SLY_EXTRA_CA_CERTS() and os.path.exists(constants.SLY_EXTRA_CA_CERTS()):
envs.append(f"{constants._SLY_EXTRA_CA_CERTS}={constants.SLY_EXTRA_CA_CERTS_FILEPATH()}")
volumes.append(f"{constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME()}:{constants.SLY_EXTRA_CA_CERTS_DIR()}")
envs.append(
f"{constants._SLY_EXTRA_CA_CERTS}={constants.SLY_EXTRA_CA_CERTS_FILEPATH()}"
)
volumes.append(
f"{constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME()}:{constants.SLY_EXTRA_CA_CERTS_DIR()}"
)

log_config = LogConfig(
type="local", config={"max-size": "1m", "max-file": "1", "compress": "false"}
Expand Down Expand Up @@ -178,26 +182,9 @@ def _start_net_client(docker_api=None):
raise e


def _nvidia_runtime_check():
def _is_runtime_changed(new_runtime):
container_info = get_container_info()
runtime = container_info["HostConfig"]["Runtime"]
if runtime == "nvidia":
return False
sly.logger.debug("NVIDIA runtime is not enabled. Checking if it can be enabled...")
docker_api = docker.from_env()
image = constants.DEFAULT_APP_DOCKER_IMAGE()
try:
docker_api.containers.run(
image,
command="nvidia-smi",
runtime="nvidia",
remove=True,
)
sly.logger.debug("NVIDIA runtime is available. Will restart Agent with NVIDIA runtime.")
return True
except Exception as e:
sly.logger.debug("NVIDIA runtime is not available.")
return False
return container_info["HostConfig"]["Runtime"] != new_runtime


def main(args):
Expand Down Expand Up @@ -235,21 +222,23 @@ def init_envs():
"Can not update agent options. Agent will be started with current options"
)
return
restart_with_nvidia_runtime = _nvidia_runtime_check()
if new_envs.get(constants._FORCE_CPU_ONLY, "false") == "true":
runtime = "runc"
runtime_changed = _is_runtime_changed(runtime)
else:
runtime = agent_utils.maybe_update_runtime()
runtime_changed = _is_runtime_changed(runtime)
envs_changes, volumes_changes, new_ca_cert_path = agent_utils.get_options_changes(
new_envs, new_volumes, ca_cert
)
if (
len(envs_changes) > 0
or len(volumes_changes) > 0
or restart_with_nvidia_runtime
or runtime_changed
or new_ca_cert_path is not None
):
docker_api = docker.from_env()
container_info = get_container_info()
runtime = (
"nvidia" if restart_with_nvidia_runtime else container_info["HostConfig"]["Runtime"]
)

# TODO: only set true if some NET_CLIENT envs changed
new_envs[constants._UPDATE_SLY_NET_AFTER_RESTART] = "true"
Expand All @@ -262,9 +251,9 @@ def init_envs():
for k, v in envs_changes.items()
},
"volumes_changes": volumes_changes,
"runtime_changes": {container_info["HostConfig"]["Runtime"]: runtime}
if restart_with_nvidia_runtime
else {},
"runtime_changes": (
{container_info["HostConfig"]["Runtime"]: runtime} if runtime_changed else {}
),
"ca_cert_changed": bool(new_ca_cert_path),
},
)
Expand Down
61 changes: 54 additions & 7 deletions agent/worker/agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class AgentOptionsJsonFields:
NET_CLIENT_DOCKER_IMAGE = "dockerImage"
NET_SERVER_PORT = "netServerPort"
DOCKER_IMAGE = "dockerImage"
FORCE_CPU_ONLY = "forceCPUOnly"
LOG_LEVEL = "logLevel"


def create_img_meta_str(img_size_bytes, width, height):
Expand Down Expand Up @@ -584,7 +586,7 @@ def get_agent_options(server_address=None, token=None, timeout=60) -> dict:

url = constants.PUBLIC_API_SERVER_ADDRESS() + "agents.options.info"
resp = requests.post(url=url, json={"token": token}, timeout=timeout)
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
try:
text = resp.text
except:
Expand All @@ -601,7 +603,7 @@ def get_instance_version(server_address=None, timeout=60):
server_address = constants.SERVER_ADDRESS()
url = constants.PUBLIC_API_SERVER_ADDRESS() + "instance.version"
resp = requests.get(url=url, timeout=timeout)
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
if resp.status_code in (400, 401, 403, 404):
return None
try:
Expand Down Expand Up @@ -699,9 +701,6 @@ def update_env_param(name, value, default=None):
)
update_env_param(constants._HTTPS_PROXY, http_proxy, optional_defaults[constants._HTTPS_PROXY])
update_env_param(constants._NO_PROXY, no_proxy, optional_defaults[constants._NO_PROXY])
# DOCKER_IMAGE
# maybe_update_env_param(constants._DOCKER_IMAGE, options.get(AgentOptionsJsonFields.DOCKER_IMAGE, None))

update_env_param(
constants._NET_CLIENT_DOCKER_IMAGE,
net_options.get(AgentOptionsJsonFields.NET_CLIENT_DOCKER_IMAGE, None),
Expand All @@ -715,6 +714,16 @@ def update_env_param(name, value, default=None):
update_env_param(
constants._DOCKER_IMAGE, options.get(AgentOptionsJsonFields.DOCKER_IMAGE, None)
)
update_env_param(
constants._FORCE_CPU_ONLY,
str(options.get(AgentOptionsJsonFields.FORCE_CPU_ONLY, False)).lower(),
optional_defaults[constants._NET_SERVER_PORT],
)
update_env_param(
constants._LOG_LEVEL,
options.get(AgentOptionsJsonFields.LOG_LEVEL, None),
optional_defaults[constants._LOG_LEVEL],
)

agent_host_dir = options.get(AgentOptionsJsonFields.AGENT_HOST_DIR, "").strip()
if agent_host_dir == "":
Expand Down Expand Up @@ -782,6 +791,7 @@ def _volumes_changes(volumes) -> dict:
changes[key] = value
return changes


def _is_bind_attached(container_info, bind_path):
vols = binds_to_volumes_dict(container_info.get("HostConfig", {}).get("Binds", []))

Expand All @@ -791,15 +801,17 @@ def _is_bind_attached(container_info, bind_path):

return False


def _copy_file_to_container(container, src, dst_dir: str):
stream = io.BytesIO()
with tarfile.open(fileobj=stream, mode='w|') as tar, open(src, 'rb') as f:
with tarfile.open(fileobj=stream, mode="w|") as tar, open(src, "rb") as f:
info = tar.gettarinfo(fileobj=f)
info.name = os.path.basename(src)
tar.addfile(info, f)

container.put_archive(dst_dir, stream.getvalue())


def _ca_cert_changed(ca_cert) -> str:
if ca_cert is None:
return None
Expand Down Expand Up @@ -832,7 +844,12 @@ def _ca_cert_changed(ca_cert) -> str:
tmp_container = docker_api.containers.create(
agent_image,
"",
volumes={constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME(): {"bind": constants.SLY_EXTRA_CA_CERTS_DIR(), "mode": "rw"}},
volumes={
constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME(): {
"bind": constants.SLY_EXTRA_CA_CERTS_DIR(),
"mode": "rw",
}
},
)

_copy_file_to_container(tmp_container, cert_path, constants.SLY_EXTRA_CA_CERTS_DIR())
Expand Down Expand Up @@ -958,3 +975,33 @@ def restart_agent(
"Docker container is spawned",
extra={"container_id": container.id, "container_name": container.name},
)


def nvidia_runtime_is_available():
docker_api = docker.from_env()
image = constants.DEFAULT_APP_DOCKER_IMAGE()
try:
docker_api.containers.run(
image,
command="nvidia-smi",
runtime="nvidia",
remove=True,
)
return True
except Exception as e:
return False


def maybe_update_runtime():
container_info = get_container_info()
runtime = container_info["HostConfig"]["Runtime"]
if runtime == "nvidia":
return runtime
sly.logger.debug("NVIDIA runtime is not enabled. Checking if it can be enabled...")
is_available = nvidia_runtime_is_available()
if is_available:
sly.logger.debug("NVIDIA runtime is available. Will restart Agent with NVIDIA runtime.")
return "nvidia"
else:
sly.logger.debug("NVIDIA runtime is not available.")
return runtime
19 changes: 17 additions & 2 deletions agent/worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import supervisely_lib as sly
import hashlib
import re
from supervisely_lib.io.docker_utils import PullPolicy # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.docker_utils import (
PullPolicy,
) # pylint: disable=import-error, no-name-in-module


_SERVER_ADDRESS = "SERVER_ADDRESS"
Expand Down Expand Up @@ -99,6 +101,8 @@ def TOKEN():
_UPDATE_SLY_NET_AFTER_RESTART = "UPDATE_SLY_NET_AFTER_RESTART"
_DOCKER_IMAGE = "DOCKER_IMAGE"
_CONTAINER_NAME = "CONTAINER_NAME"
_FORCE_CPU_ONLY = "FORCE_CPU_ONLY"
_LOG_LEVEL = "LOG_LEVEL"

_NET_CLIENT_DOCKER_IMAGE = "NET_CLIENT_DOCKER_IMAGE"
_NET_SERVER_PORT = "NET_SERVER_PORT"
Expand Down Expand Up @@ -163,6 +167,8 @@ def TOKEN():
_AGENT_RESTART_COUNT: 0,
_SLY_EXTRA_CA_CERTS_DIR: "/sly_certs",
_SLY_EXTRA_CA_CERTS_VOLUME_NAME: f"supervisely-agent-ca-certs-{TOKEN()[:8]}",
_FORCE_CPU_ONLY: "false",
_LOG_LEVEL: "INFO",
}


Expand Down Expand Up @@ -261,7 +267,7 @@ def AGENT_TASKS_DIR():

def AGENT_TASK_SHARED_DIR():
"""default: /sly_agent/tasks/task_shared"""
return os.path.join(AGENT_TASKS_DIR(), sly.task.paths.TASK_SHARED) # pylint: disable=no-member
return os.path.join(AGENT_TASKS_DIR(), sly.task.paths.TASK_SHARED) # pylint: disable=no-member


def AGENT_TMP_DIR():
Expand Down Expand Up @@ -658,6 +664,7 @@ def AGENT_RESTART_COUNT():
def SLY_EXTRA_CA_CERTS_DIR():
return read_optional_setting(_SLY_EXTRA_CA_CERTS_DIR)


def SLY_EXTRA_CA_CERTS_FILEPATH():
return os.path.join(SLY_EXTRA_CA_CERTS_DIR(), "instance_ca_chain.crt")

Expand All @@ -666,6 +673,14 @@ def SLY_EXTRA_CA_CERTS_VOLUME_NAME():
return read_optional_setting(_SLY_EXTRA_CA_CERTS_VOLUME_NAME)


def FORCE_CPU_ONLY():
return sly.env.flag_from_env(read_optional_setting(_FORCE_CPU_ONLY))


def LOG_LEVEL():
return read_optional_setting(_LOG_LEVEL)


def init_constants():
sly.fs.mkdir(AGENT_LOG_DIR())
sly.fs.mkdir(AGENT_TASKS_DIR())
Expand Down
29 changes: 19 additions & 10 deletions agent/worker/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@

import supervisely_lib as sly
from .task_dockerized import TaskDockerized
from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import flatten_json, modify_keys # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import SUPERVISELY_TASK_ID # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import Api # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.fs import ( # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import (
dump_json_file,
) # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import (
flatten_json,
modify_keys,
) # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import (
SUPERVISELY_TASK_ID,
) # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import Api # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.fs import ( # pylint: disable=import-error, no-name-in-module
ensure_base_path,
silent_remove,
get_file_name,
Expand All @@ -34,7 +41,9 @@
file_exists,
mkdir,
)
from supervisely_lib.io.exception_handlers import handle_exceptions # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.exception_handlers import (
handle_exceptions,
) # pylint: disable=import-error, no-name-in-module

from worker import constants
from worker.agent_utils import (
Expand Down Expand Up @@ -217,7 +226,7 @@ def init_docker_image(self):
gpu = GPUFlag.from_config(self.app_config)
self.docker_runtime = "runc"

if gpu is not GPUFlag.skipped:
if gpu is not GPUFlag.skipped and not constants.FORCE_CPU_ONLY():
docker_info = self._docker_api.info()
nvidia = "nvidia"
nvidia_available = False
Expand Down Expand Up @@ -285,9 +294,9 @@ def read_dockerimage_from_config(self):
f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}",
)
)
self.info[
"docker_image"
] = f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}"
self.info["docker_image"] = (
f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}"
)

def is_isolate(self):
if self.app_config is None:
Expand Down
28 changes: 18 additions & 10 deletions agent/worker/task_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,48 @@
import supervisely_lib as sly
import os

from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module
from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import (
dump_json_file,
) # pylint: disable=import-error, no-name-in-module

from worker.task_dockerized import TaskDockerized, TaskStep
from worker import constants


class TaskCustom(TaskDockerized):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.action_map = {}
self.docker_runtime = 'nvidia'
if not constants.FORCE_CPU_ONLY():
self.docker_runtime = "nvidia"
else:
self.docker_runtime = "runc"

self.dir_data = os.path.join(self.dir_task, os.path.basename(TaskPaths.DATA_DIR))
self.dir_results = os.path.join(self.dir_task, os.path.basename(TaskPaths.RESULTS_DIR))
self.dir_model = os.path.join(self.dir_task, os.path.basename(TaskPaths.MODEL_DIR))
self.config_path1 = os.path.join(self.dir_task, os.path.basename(TaskPaths.SETTINGS_PATH))
self.config_path2 = os.path.join(self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH))
self.config_path2 = os.path.join(
self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH)
)

def init_additional(self):
super().init_additional()
sly.fs.mkdir(self.dir_data)
sly.fs.mkdir(self.dir_results)

def download_step(self):
for model_info in self.info['models']:
self.data_mgr.download_nn(model_info['title'], self.dir_model)
for model_info in self.info["models"]:
self.data_mgr.download_nn(model_info["title"], self.dir_model)

self.logger.info("DOWNLOAD_DATA")
dump_json_file(self.info['config'], self.config_path1) # Deprecated 'task_settings.json'
dump_json_file(self.info['config'], self.config_path2) # New style task_config.json
dump_json_file(self.info["config"], self.config_path1) # Deprecated 'task_settings.json'
dump_json_file(self.info["config"], self.config_path2) # New style task_config.json

for pr_info in self.info['projects']:
self.data_mgr.download_project(self.dir_data, pr_info['title'])
for pr_info in self.info["projects"]:
self.data_mgr.download_project(self.dir_data, pr_info["title"])

self.report_step_done(TaskStep.DOWNLOAD)

Expand Down
Loading
Loading