diff --git a/agent/main.py b/agent/main.py index 0abe028..fdb7f4a 100644 --- a/agent/main.py +++ b/agent/main.py @@ -104,8 +104,12 @@ def _start_net_client(docker_api=None): ] if constants.SLY_EXTRA_CA_CERTS() and os.path.exists(constants.SLY_EXTRA_CA_CERTS()): - envs.append(f"{constants._SLY_EXTRA_CA_CERTS}={constants.SLY_EXTRA_CA_CERTS_FILEPATH()}") - volumes.append(f"{constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME()}:{constants.SLY_EXTRA_CA_CERTS_DIR()}") + envs.append( + f"{constants._SLY_EXTRA_CA_CERTS}={constants.SLY_EXTRA_CA_CERTS_FILEPATH()}" + ) + volumes.append( + f"{constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME()}:{constants.SLY_EXTRA_CA_CERTS_DIR()}" + ) log_config = LogConfig( type="local", config={"max-size": "1m", "max-file": "1", "compress": "false"} @@ -178,26 +182,9 @@ def _start_net_client(docker_api=None): raise e -def _nvidia_runtime_check(): +def _is_runtime_changed(new_runtime): container_info = get_container_info() - runtime = container_info["HostConfig"]["Runtime"] - if runtime == "nvidia": - return False - sly.logger.debug("NVIDIA runtime is not enabled. Checking if it can be enabled...") - docker_api = docker.from_env() - image = constants.DEFAULT_APP_DOCKER_IMAGE() - try: - docker_api.containers.run( - image, - command="nvidia-smi", - runtime="nvidia", - remove=True, - ) - sly.logger.debug("NVIDIA runtime is available. Will restart Agent with NVIDIA runtime.") - return True - except Exception as e: - sly.logger.debug("NVIDIA runtime is not available.") - return False + return container_info["HostConfig"]["Runtime"] != new_runtime def main(args): @@ -235,21 +222,23 @@ def init_envs(): "Can not update agent options. Agent will be started with current options" ) return - restart_with_nvidia_runtime = _nvidia_runtime_check() + if new_envs.get(constants._FORCE_CPU_ONLY, "false") == "true": + runtime = "runc" + runtime_changed = _is_runtime_changed(runtime) + else: + runtime = agent_utils.maybe_update_runtime() + runtime_changed = _is_runtime_changed(runtime) envs_changes, volumes_changes, new_ca_cert_path = agent_utils.get_options_changes( new_envs, new_volumes, ca_cert ) if ( len(envs_changes) > 0 or len(volumes_changes) > 0 - or restart_with_nvidia_runtime + or runtime_changed or new_ca_cert_path is not None ): docker_api = docker.from_env() container_info = get_container_info() - runtime = ( - "nvidia" if restart_with_nvidia_runtime else container_info["HostConfig"]["Runtime"] - ) # TODO: only set true if some NET_CLIENT envs changed new_envs[constants._UPDATE_SLY_NET_AFTER_RESTART] = "true" @@ -262,9 +251,9 @@ def init_envs(): for k, v in envs_changes.items() }, "volumes_changes": volumes_changes, - "runtime_changes": {container_info["HostConfig"]["Runtime"]: runtime} - if restart_with_nvidia_runtime - else {}, + "runtime_changes": ( + {container_info["HostConfig"]["Runtime"]: runtime} if runtime_changed else {} + ), "ca_cert_changed": bool(new_ca_cert_path), }, ) diff --git a/agent/worker/agent_utils.py b/agent/worker/agent_utils.py index f9a4570..91cab7f 100644 --- a/agent/worker/agent_utils.py +++ b/agent/worker/agent_utils.py @@ -52,6 +52,8 @@ class AgentOptionsJsonFields: NET_CLIENT_DOCKER_IMAGE = "dockerImage" NET_SERVER_PORT = "netServerPort" DOCKER_IMAGE = "dockerImage" + FORCE_CPU_ONLY = "forceCPUOnly" + LOG_LEVEL = "logLevel" def create_img_meta_str(img_size_bytes, width, height): @@ -584,7 +586,7 @@ def get_agent_options(server_address=None, token=None, timeout=60) -> dict: url = constants.PUBLIC_API_SERVER_ADDRESS() + "agents.options.info" resp = requests.post(url=url, json={"token": token}, timeout=timeout) - if resp.status_code != requests.codes.ok: # pylint: disable=no-member + if resp.status_code != requests.codes.ok: # pylint: disable=no-member try: text = resp.text except: @@ -601,7 +603,7 @@ def get_instance_version(server_address=None, timeout=60): server_address = constants.SERVER_ADDRESS() url = constants.PUBLIC_API_SERVER_ADDRESS() + "instance.version" resp = requests.get(url=url, timeout=timeout) - if resp.status_code != requests.codes.ok: # pylint: disable=no-member + if resp.status_code != requests.codes.ok: # pylint: disable=no-member if resp.status_code in (400, 401, 403, 404): return None try: @@ -699,9 +701,6 @@ def update_env_param(name, value, default=None): ) update_env_param(constants._HTTPS_PROXY, http_proxy, optional_defaults[constants._HTTPS_PROXY]) update_env_param(constants._NO_PROXY, no_proxy, optional_defaults[constants._NO_PROXY]) - # DOCKER_IMAGE - # maybe_update_env_param(constants._DOCKER_IMAGE, options.get(AgentOptionsJsonFields.DOCKER_IMAGE, None)) - update_env_param( constants._NET_CLIENT_DOCKER_IMAGE, net_options.get(AgentOptionsJsonFields.NET_CLIENT_DOCKER_IMAGE, None), @@ -715,6 +714,16 @@ def update_env_param(name, value, default=None): update_env_param( constants._DOCKER_IMAGE, options.get(AgentOptionsJsonFields.DOCKER_IMAGE, None) ) + update_env_param( + constants._FORCE_CPU_ONLY, + str(options.get(AgentOptionsJsonFields.FORCE_CPU_ONLY, False)).lower(), + optional_defaults[constants._NET_SERVER_PORT], + ) + update_env_param( + constants._LOG_LEVEL, + options.get(AgentOptionsJsonFields.LOG_LEVEL, None), + optional_defaults[constants._LOG_LEVEL], + ) agent_host_dir = options.get(AgentOptionsJsonFields.AGENT_HOST_DIR, "").strip() if agent_host_dir == "": @@ -782,6 +791,7 @@ def _volumes_changes(volumes) -> dict: changes[key] = value return changes + def _is_bind_attached(container_info, bind_path): vols = binds_to_volumes_dict(container_info.get("HostConfig", {}).get("Binds", [])) @@ -791,15 +801,17 @@ def _is_bind_attached(container_info, bind_path): return False + def _copy_file_to_container(container, src, dst_dir: str): stream = io.BytesIO() - with tarfile.open(fileobj=stream, mode='w|') as tar, open(src, 'rb') as f: + with tarfile.open(fileobj=stream, mode="w|") as tar, open(src, "rb") as f: info = tar.gettarinfo(fileobj=f) info.name = os.path.basename(src) tar.addfile(info, f) container.put_archive(dst_dir, stream.getvalue()) + def _ca_cert_changed(ca_cert) -> str: if ca_cert is None: return None @@ -832,7 +844,12 @@ def _ca_cert_changed(ca_cert) -> str: tmp_container = docker_api.containers.create( agent_image, "", - volumes={constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME(): {"bind": constants.SLY_EXTRA_CA_CERTS_DIR(), "mode": "rw"}}, + volumes={ + constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME(): { + "bind": constants.SLY_EXTRA_CA_CERTS_DIR(), + "mode": "rw", + } + }, ) _copy_file_to_container(tmp_container, cert_path, constants.SLY_EXTRA_CA_CERTS_DIR()) @@ -958,3 +975,33 @@ def restart_agent( "Docker container is spawned", extra={"container_id": container.id, "container_name": container.name}, ) + + +def nvidia_runtime_is_available(): + docker_api = docker.from_env() + image = constants.DEFAULT_APP_DOCKER_IMAGE() + try: + docker_api.containers.run( + image, + command="nvidia-smi", + runtime="nvidia", + remove=True, + ) + return True + except Exception as e: + return False + + +def maybe_update_runtime(): + container_info = get_container_info() + runtime = container_info["HostConfig"]["Runtime"] + if runtime == "nvidia": + return runtime + sly.logger.debug("NVIDIA runtime is not enabled. Checking if it can be enabled...") + is_available = nvidia_runtime_is_available() + if is_available: + sly.logger.debug("NVIDIA runtime is available. Will restart Agent with NVIDIA runtime.") + return "nvidia" + else: + sly.logger.debug("NVIDIA runtime is not available.") + return runtime diff --git a/agent/worker/constants.py b/agent/worker/constants.py index ed14d46..cd00498 100644 --- a/agent/worker/constants.py +++ b/agent/worker/constants.py @@ -6,7 +6,9 @@ import supervisely_lib as sly import hashlib import re -from supervisely_lib.io.docker_utils import PullPolicy # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.docker_utils import ( + PullPolicy, +) # pylint: disable=import-error, no-name-in-module _SERVER_ADDRESS = "SERVER_ADDRESS" @@ -99,6 +101,8 @@ def TOKEN(): _UPDATE_SLY_NET_AFTER_RESTART = "UPDATE_SLY_NET_AFTER_RESTART" _DOCKER_IMAGE = "DOCKER_IMAGE" _CONTAINER_NAME = "CONTAINER_NAME" +_FORCE_CPU_ONLY = "FORCE_CPU_ONLY" +_LOG_LEVEL = "LOG_LEVEL" _NET_CLIENT_DOCKER_IMAGE = "NET_CLIENT_DOCKER_IMAGE" _NET_SERVER_PORT = "NET_SERVER_PORT" @@ -163,6 +167,8 @@ def TOKEN(): _AGENT_RESTART_COUNT: 0, _SLY_EXTRA_CA_CERTS_DIR: "/sly_certs", _SLY_EXTRA_CA_CERTS_VOLUME_NAME: f"supervisely-agent-ca-certs-{TOKEN()[:8]}", + _FORCE_CPU_ONLY: "false", + _LOG_LEVEL: "INFO", } @@ -261,7 +267,7 @@ def AGENT_TASKS_DIR(): def AGENT_TASK_SHARED_DIR(): """default: /sly_agent/tasks/task_shared""" - return os.path.join(AGENT_TASKS_DIR(), sly.task.paths.TASK_SHARED) # pylint: disable=no-member + return os.path.join(AGENT_TASKS_DIR(), sly.task.paths.TASK_SHARED) # pylint: disable=no-member def AGENT_TMP_DIR(): @@ -658,6 +664,7 @@ def AGENT_RESTART_COUNT(): def SLY_EXTRA_CA_CERTS_DIR(): return read_optional_setting(_SLY_EXTRA_CA_CERTS_DIR) + def SLY_EXTRA_CA_CERTS_FILEPATH(): return os.path.join(SLY_EXTRA_CA_CERTS_DIR(), "instance_ca_chain.crt") @@ -666,6 +673,14 @@ def SLY_EXTRA_CA_CERTS_VOLUME_NAME(): return read_optional_setting(_SLY_EXTRA_CA_CERTS_VOLUME_NAME) +def FORCE_CPU_ONLY(): + return sly.env.flag_from_env(read_optional_setting(_FORCE_CPU_ONLY)) + + +def LOG_LEVEL(): + return read_optional_setting(_LOG_LEVEL) + + def init_constants(): sly.fs.mkdir(AGENT_LOG_DIR()) sly.fs.mkdir(AGENT_TASKS_DIR()) diff --git a/agent/worker/task_app.py b/agent/worker/task_app.py index aa6b85c..6ae38f0 100644 --- a/agent/worker/task_app.py +++ b/agent/worker/task_app.py @@ -21,11 +21,18 @@ import supervisely_lib as sly from .task_dockerized import TaskDockerized -from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module -from supervisely_lib.io.json import flatten_json, modify_keys # pylint: disable=import-error, no-name-in-module -from supervisely_lib.api.api import SUPERVISELY_TASK_ID # pylint: disable=import-error, no-name-in-module -from supervisely_lib.api.api import Api # pylint: disable=import-error, no-name-in-module -from supervisely_lib.io.fs import ( # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.json import ( + dump_json_file, +) # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.json import ( + flatten_json, + modify_keys, +) # pylint: disable=import-error, no-name-in-module +from supervisely_lib.api.api import ( + SUPERVISELY_TASK_ID, +) # pylint: disable=import-error, no-name-in-module +from supervisely_lib.api.api import Api # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.fs import ( # pylint: disable=import-error, no-name-in-module ensure_base_path, silent_remove, get_file_name, @@ -34,7 +41,9 @@ file_exists, mkdir, ) -from supervisely_lib.io.exception_handlers import handle_exceptions # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.exception_handlers import ( + handle_exceptions, +) # pylint: disable=import-error, no-name-in-module from worker import constants from worker.agent_utils import ( @@ -217,7 +226,7 @@ def init_docker_image(self): gpu = GPUFlag.from_config(self.app_config) self.docker_runtime = "runc" - if gpu is not GPUFlag.skipped: + if gpu is not GPUFlag.skipped and not constants.FORCE_CPU_ONLY(): docker_info = self._docker_api.info() nvidia = "nvidia" nvidia_available = False @@ -285,9 +294,9 @@ def read_dockerimage_from_config(self): f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}", ) ) - self.info[ - "docker_image" - ] = f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}" + self.info["docker_image"] = ( + f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}" + ) def is_isolate(self): if self.app_config is None: diff --git a/agent/worker/task_custom.py b/agent/worker/task_custom.py index 43e4f43..d7e9ff4 100644 --- a/agent/worker/task_custom.py +++ b/agent/worker/task_custom.py @@ -4,10 +4,13 @@ import supervisely_lib as sly import os -from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module -from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module +from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.json import ( + dump_json_file, +) # pylint: disable=import-error, no-name-in-module from worker.task_dockerized import TaskDockerized, TaskStep +from worker import constants class TaskCustom(TaskDockerized): @@ -15,13 +18,18 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.action_map = {} - self.docker_runtime = 'nvidia' + if not constants.FORCE_CPU_ONLY(): + self.docker_runtime = "nvidia" + else: + self.docker_runtime = "runc" self.dir_data = os.path.join(self.dir_task, os.path.basename(TaskPaths.DATA_DIR)) self.dir_results = os.path.join(self.dir_task, os.path.basename(TaskPaths.RESULTS_DIR)) self.dir_model = os.path.join(self.dir_task, os.path.basename(TaskPaths.MODEL_DIR)) self.config_path1 = os.path.join(self.dir_task, os.path.basename(TaskPaths.SETTINGS_PATH)) - self.config_path2 = os.path.join(self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH)) + self.config_path2 = os.path.join( + self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH) + ) def init_additional(self): super().init_additional() @@ -29,15 +37,15 @@ def init_additional(self): sly.fs.mkdir(self.dir_results) def download_step(self): - for model_info in self.info['models']: - self.data_mgr.download_nn(model_info['title'], self.dir_model) + for model_info in self.info["models"]: + self.data_mgr.download_nn(model_info["title"], self.dir_model) self.logger.info("DOWNLOAD_DATA") - dump_json_file(self.info['config'], self.config_path1) # Deprecated 'task_settings.json' - dump_json_file(self.info['config'], self.config_path2) # New style task_config.json + dump_json_file(self.info["config"], self.config_path1) # Deprecated 'task_settings.json' + dump_json_file(self.info["config"], self.config_path2) # New style task_config.json - for pr_info in self.info['projects']: - self.data_mgr.download_project(self.dir_data, pr_info['title']) + for pr_info in self.info["projects"]: + self.data_mgr.download_project(self.dir_data, pr_info["title"]) self.report_step_done(TaskStep.DOWNLOAD) diff --git a/agent/worker/task_inference.py b/agent/worker/task_inference.py index 2270a7c..c500c71 100644 --- a/agent/worker/task_inference.py +++ b/agent/worker/task_inference.py @@ -5,28 +5,34 @@ import shutil import supervisely_lib as sly -from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module -from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module +from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.json import ( + dump_json_file, +) # pylint: disable=import-error, no-name-in-module from worker.task_dockerized import TaskDockerized, TaskStep from worker import agent_utils +from worker import constants class TaskInference(TaskDockerized): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.action_map = { - str(sly.EventType.MODEL_APPLIED): self.upload_anns - } - self.docker_runtime = 'nvidia' + self.action_map = {str(sly.EventType.MODEL_APPLIED): self.upload_anns} + if not constants.FORCE_CPU_ONLY(): + self.docker_runtime = "nvidia" + else: + self.docker_runtime = "runc" self.entrypoint = "/workdir/src/inference.py" self.dir_data = os.path.join(self.dir_task, os.path.basename(TaskPaths.DATA_DIR)) self.dir_results = os.path.join(self.dir_task, os.path.basename(TaskPaths.RESULTS_DIR)) self.dir_model = os.path.join(self.dir_task, os.path.basename(TaskPaths.MODEL_DIR)) self.config_path1 = os.path.join(self.dir_task, os.path.basename(TaskPaths.SETTINGS_PATH)) - self.config_path2 = os.path.join(self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH)) + self.config_path2 = os.path.join( + self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH) + ) def init_additional(self): super().init_additional() @@ -36,19 +42,19 @@ def init_additional(self): def download_step(self): self.logger.info("DOWNLOAD_DATA") - dump_json_file(self.info['config'], self.config_path1) - dump_json_file(self.info['config'], self.config_path2) + dump_json_file(self.info["config"], self.config_path1) + dump_json_file(self.info["config"], self.config_path2) - model = agent_utils.get_single_item_or_die(self.info, 'models', 'config') - self.data_mgr.download_nn(model['title'], self.dir_model) + model = agent_utils.get_single_item_or_die(self.info, "models", "config") + self.data_mgr.download_nn(model["title"], self.dir_model) - project_name = agent_utils.get_single_item_or_die(self.info, 'projects', 'config')['title'] + project_name = agent_utils.get_single_item_or_die(self.info, "projects", "config")["title"] self.data_mgr.download_project(self.dir_data, project_name) - #@TODO: only for compatibility with old models - shutil.move(self.dir_model, self.dir_model + '_delme') - shutil.move(os.path.join(self.dir_model + '_delme', model['title']), self.dir_model) - sly.fs.remove_dir(self.dir_model + '_delme') + # @TODO: only for compatibility with old models + shutil.move(self.dir_model, self.dir_model + "_delme") + shutil.move(os.path.join(self.dir_model + "_delme", model["title"]), self.dir_model) + sly.fs.remove_dir(self.dir_model + "_delme") self.report_step_done(TaskStep.DOWNLOAD) @@ -60,6 +66,8 @@ def upload_step(self): def upload_anns(self, _): self.report_step_done(TaskStep.MAIN) - self.data_mgr.upload_project(self.dir_results, self.info['projects'][0]['title'], self.info['new_title']) + self.data_mgr.upload_project( + self.dir_results, self.info["projects"][0]["title"], self.info["new_title"] + ) self.report_step_done(TaskStep.UPLOAD) return {} diff --git a/agent/worker/task_inference_rpc.py b/agent/worker/task_inference_rpc.py index 4b56bfa..ecd6345 100644 --- a/agent/worker/task_inference_rpc.py +++ b/agent/worker/task_inference_rpc.py @@ -5,8 +5,10 @@ import shutil import supervisely_lib as sly -from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module -from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module +from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module +from supervisely_lib.io.json import ( + dump_json_file, +) # pylint: disable=import-error, no-name-in-module from worker.task_dockerized import TaskDockerized, TaskStep from worker import constants @@ -16,41 +18,48 @@ class TaskInferenceRPC(TaskDockerized): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.docker_runtime = 'nvidia' + if not constants.FORCE_CPU_ONLY(): + self.docker_runtime = "nvidia" + else: + self.docker_runtime = "runc" - #@TODO: need refatoring + # @TODO: need refatoring self.entrypoint = "/workdir/src/deploy.py" - if self.info['task_type'] == 'smarttool': + if self.info["task_type"] == "smarttool": self.entrypoint = "/workdir/src/deploy_smart.py" self.dir_model = os.path.join(self.dir_task, os.path.basename(TaskPaths.MODEL_DIR)) self.config_path1 = os.path.join(self.dir_task, os.path.basename(TaskPaths.SETTINGS_PATH)) - self.config_path2 = os.path.join(self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH)) + self.config_path2 = os.path.join( + self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH) + ) def init_additional(self): super().init_additional() sly.fs.mkdir(self.dir_model) def download_step(self): - if self.info.get('nn_model', None) is None: - self.logger.critical('TASK_NN_EMPTY') - raise ValueError('TASK_NN_EMPTY') + if self.info.get("nn_model", None) is None: + self.logger.critical("TASK_NN_EMPTY") + raise ValueError("TASK_NN_EMPTY") - self.data_mgr.download_nn(self.info['nn_model']['title'], self.dir_model) + self.data_mgr.download_nn(self.info["nn_model"]["title"], self.dir_model) - #@TODO: only for compatibility with old models - shutil.move(self.dir_model, self.dir_model + '_delme') - shutil.move(os.path.join(self.dir_model + '_delme', self.info['nn_model']['title']), self.dir_model) - sly.fs.remove_dir(self.dir_model + '_delme') + # @TODO: only for compatibility with old models + shutil.move(self.dir_model, self.dir_model + "_delme") + shutil.move( + os.path.join(self.dir_model + "_delme", self.info["nn_model"]["title"]), self.dir_model + ) + sly.fs.remove_dir(self.dir_model + "_delme") out_cfg = { - **self.info['task_settings'], # settings from server - 'connection': { - 'server_address': constants.SERVER_ADDRESS(), - 'token': constants.TOKEN(), - 'task_id': str(self.info['task_id']), + **self.info["task_settings"], # settings from server + "connection": { + "server_address": constants.SERVER_ADDRESS(), + "token": constants.TOKEN(), + "task_id": str(self.info["task_id"]), }, - 'model_settings': self.info['task_settings'] + "model_settings": self.info["task_settings"], } dump_json_file(out_cfg, self.config_path1) # Deprecated 'task_settings.json' diff --git a/agent/worker/task_train.py b/agent/worker/task_train.py index 5363af0..b22c13e 100644 --- a/agent/worker/task_train.py +++ b/agent/worker/task_train.py @@ -4,10 +4,11 @@ import os.path as osp import json import supervisely_lib as sly -from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module +from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module from .task_dockerized import TaskDockerized, TaskStep from worker import agent_utils +from worker import constants import shutil @@ -15,18 +16,21 @@ class TaskTrain(TaskDockerized): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.action_map = { - str(sly.EventType.CHECKPOINT): self.upload_model - } - self.docker_runtime = 'nvidia' + self.action_map = {str(sly.EventType.CHECKPOINT): self.upload_model} + if not constants.FORCE_CPU_ONLY(): + self.docker_runtime = "nvidia" + else: + self.docker_runtime = "runc" self.entrypoint = "/workdir/src/train.py" self.dir_data = os.path.join(self.dir_task, os.path.basename(TaskPaths.DATA_DIR)) self.dir_results = os.path.join(self.dir_task, os.path.basename(TaskPaths.RESULTS_DIR)) self.dir_model = os.path.join(self.dir_task, os.path.basename(TaskPaths.MODEL_DIR)) self.config_path1 = os.path.join(self.dir_task, os.path.basename(TaskPaths.SETTINGS_PATH)) - self.config_path2 = os.path.join(self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH)) - self.dir_tmp = osp.join(self.dir_task, 'tmp') + self.config_path2 = os.path.join( + self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH) + ) + self.dir_tmp = osp.join(self.dir_task, "tmp") self.last_checkpoint = {} @@ -34,8 +38,10 @@ def init_additional(self): super().init_additional() if not self.data_mgr.has_nn_storage(): - raise ValueError('Host agent has local neural networks storage disabled. Training without local storage ' + - 'is not supported because there is no space to keep checkpoints.') + raise ValueError( + "Host agent has local neural networks storage disabled. Training without local storage " + + "is not supported because there is no space to keep checkpoints." + ) sly.fs.mkdir(self.dir_data) sly.fs.mkdir(self.dir_tmp) @@ -44,22 +50,30 @@ def init_additional(self): def download_step(self): self.logger.info("DOWNLOAD_DATA") - sly.io.json.dump_json_file(self.info['config'], self.config_path1) # Deprecated 'task_settings.json' # pylint: disable=no-member - sly.io.json.dump_json_file(self.info['config'], self.config_path2) # New style task_config.json # pylint: disable=no-member - - if len(self.info['projects']) != 1: - raise ValueError("Config contains {} projects. Training works only with single project.".format(len(self.info['projects']))) - - project_name = agent_utils.get_single_item_or_die(self.info, 'projects', 'config')['title'] + sly.io.json.dump_json_file( + self.info["config"], self.config_path1 + ) # Deprecated 'task_settings.json' # pylint: disable=no-member + sly.io.json.dump_json_file( + self.info["config"], self.config_path2 + ) # New style task_config.json # pylint: disable=no-member + + if len(self.info["projects"]) != 1: + raise ValueError( + "Config contains {} projects. Training works only with single project.".format( + len(self.info["projects"]) + ) + ) + + project_name = agent_utils.get_single_item_or_die(self.info, "projects", "config")["title"] self.data_mgr.download_project(self.dir_data, project_name) - model = agent_utils.get_single_item_or_die(self.info, 'models', 'config') - self.data_mgr.download_nn(model['title'], self.dir_model) + model = agent_utils.get_single_item_or_die(self.info, "models", "config") + self.data_mgr.download_nn(model["title"], self.dir_model) # @TODO: only for compatibility with old models - shutil.move(self.dir_model, self.dir_model + '_delme') - shutil.move(os.path.join(self.dir_model + '_delme', model['title']), self.dir_model) - sly.fs.remove_dir(self.dir_model + '_delme') + shutil.move(self.dir_model, self.dir_model + "_delme") + shutil.move(os.path.join(self.dir_model + "_delme", model["title"]), self.dir_model) + sly.fs.remove_dir(self.dir_model + "_delme") self.report_step_done(TaskStep.DOWNLOAD) @@ -67,33 +81,33 @@ def before_main_step(self): sly.fs.clean_dir(self.dir_results) def upload_step(self): - if 'id' not in self.last_checkpoint or 'hash' not in self.last_checkpoint: - raise RuntimeError('No checkpoints produced during training') - self.data_mgr.upload_nn(self.last_checkpoint['id'], self.last_checkpoint['hash']) + if "id" not in self.last_checkpoint or "hash" not in self.last_checkpoint: + raise RuntimeError("No checkpoints produced during training") + self.data_mgr.upload_nn(self.last_checkpoint["id"], self.last_checkpoint["hash"]) self.report_step_done(TaskStep.UPLOAD) def upload_model(self, extra): - model = self.public_api.model.generate_hash(self.info['task_id']) - model_id = model['id'] - model_hash = model['hash'] + model = self.public_api.model.generate_hash(self.info["task_id"]) + model_id = model["id"] + model_hash = model["hash"] - log_extra = {'model_id': model_id, 'model_hash': model_hash} - self.logger.trace('NEW_MODEL_ID', extra=log_extra) + log_extra = {"model_id": model_id, "model_hash": model_hash} + self.logger.trace("NEW_MODEL_ID", extra=log_extra) - cur_checkpoint_dir = os.path.join(self.dir_results, extra['subdir']) + cur_checkpoint_dir = os.path.join(self.dir_results, extra["subdir"]) storage = self.data_mgr.storage.nns if storage.check_storage_object(model_hash): - self.logger.critical('CHECKPOINT_HASH_ALREADY_EXISTS') + self.logger.critical("CHECKPOINT_HASH_ALREADY_EXISTS") raise RuntimeError() storage.write_object(cur_checkpoint_dir, model_hash) - model_config_path = osp.join(cur_checkpoint_dir, 'config.json') + model_config_path = osp.join(cur_checkpoint_dir, "config.json") if osp.isfile(model_config_path): - log_extra['model_config'] = json.load(open(model_config_path, 'r')) + log_extra["model_config"] = json.load(open(model_config_path, "r")) else: - log_extra['model_config'] = {} + log_extra["model_config"] = {} - self.logger.info('MODEL_SAVED', extra=log_extra) + self.logger.info("MODEL_SAVED", extra=log_extra) # don't report TaskStep.UPLOAD because there should be multiple uploads self.last_checkpoint["id"] = model_id diff --git a/agent/worker/task_update.py b/agent/worker/task_update.py index bf322d1..c844429 100644 --- a/agent/worker/task_update.py +++ b/agent/worker/task_update.py @@ -80,6 +80,11 @@ def task_main_func(self): "Couldn't find sly-net-client attached to this agent. We'll try to deploy it during the agent restart" ) + if envs.get(constants._FORCE_CPU_ONLY, "false") == "true": + runtime = "runc" + else: + runtime = agent_utils.maybe_update_runtime() + # Stop current container cur_container_id = container_info["Config"]["Hostname"] envs[constants._REMOVE_OLD_AGENT] = cur_container_id @@ -88,6 +93,7 @@ def task_main_func(self): image=image, envs=envs, volumes=volumes, + runtime=runtime, ca_cert_path=ca_cert_path, docker_api=self._docker_api, )