Skip to content

Commit

Permalink
Add FORCE_CPU_ONLY option (#69)
Browse files Browse the repository at this point in the history
* add force cpu only option

* add debug log

* add debug log

* add log_level option

* force cpu only data type fix

* remove debug logs

* enforce force cpu only option in apps
  • Loading branch information
NikolaiPetukhov authored Feb 20, 2024
1 parent 5de7ac9 commit 30b6ab3
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 130 deletions.
47 changes: 18 additions & 29 deletions agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ def _start_net_client(docker_api=None):
]

if constants.SLY_EXTRA_CA_CERTS() and os.path.exists(constants.SLY_EXTRA_CA_CERTS()):
envs.append(f"{constants._SLY_EXTRA_CA_CERTS}={constants.SLY_EXTRA_CA_CERTS_FILEPATH()}")
volumes.append(f"{constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME()}:{constants.SLY_EXTRA_CA_CERTS_DIR()}")
envs.append(
f"{constants._SLY_EXTRA_CA_CERTS}={constants.SLY_EXTRA_CA_CERTS_FILEPATH()}"
)
volumes.append(
f"{constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME()}:{constants.SLY_EXTRA_CA_CERTS_DIR()}"
)

log_config = LogConfig(
type="local", config={"max-size": "1m", "max-file": "1", "compress": "false"}
Expand Down Expand Up @@ -178,26 +182,9 @@ def _start_net_client(docker_api=None):
raise e


def _nvidia_runtime_check():
def _is_runtime_changed(new_runtime):
container_info = get_container_info()
runtime = container_info["HostConfig"]["Runtime"]
if runtime == "nvidia":
return False
sly.logger.debug("NVIDIA runtime is not enabled. Checking if it can be enabled...")
docker_api = docker.from_env()
image = constants.DEFAULT_APP_DOCKER_IMAGE()
try:
docker_api.containers.run(
image,
command="nvidia-smi",
runtime="nvidia",
remove=True,
)
sly.logger.debug("NVIDIA runtime is available. Will restart Agent with NVIDIA runtime.")
return True
except Exception as e:
sly.logger.debug("NVIDIA runtime is not available.")
return False
return container_info["HostConfig"]["Runtime"] != new_runtime


def main(args):
Expand Down Expand Up @@ -235,21 +222,23 @@ def init_envs():
"Can not update agent options. Agent will be started with current options"
)
return
restart_with_nvidia_runtime = _nvidia_runtime_check()
if new_envs.get(constants._FORCE_CPU_ONLY, "false") == "true":
runtime = "runc"
runtime_changed = _is_runtime_changed(runtime)
else:
runtime = agent_utils.maybe_update_runtime()
runtime_changed = _is_runtime_changed(runtime)
envs_changes, volumes_changes, new_ca_cert_path = agent_utils.get_options_changes(
new_envs, new_volumes, ca_cert
)
if (
len(envs_changes) > 0
or len(volumes_changes) > 0
or restart_with_nvidia_runtime
or runtime_changed
or new_ca_cert_path is not None
):
docker_api = docker.from_env()
container_info = get_container_info()
runtime = (
"nvidia" if restart_with_nvidia_runtime else container_info["HostConfig"]["Runtime"]
)

# TODO: only set true if some NET_CLIENT envs changed
new_envs[constants._UPDATE_SLY_NET_AFTER_RESTART] = "true"
Expand All @@ -262,9 +251,9 @@ def init_envs():
for k, v in envs_changes.items()
},
"volumes_changes": volumes_changes,
"runtime_changes": {container_info["HostConfig"]["Runtime"]: runtime}
if restart_with_nvidia_runtime
else {},
"runtime_changes": (
{container_info["HostConfig"]["Runtime"]: runtime} if runtime_changed else {}
),
"ca_cert_changed": bool(new_ca_cert_path),
},
)
Expand Down
61 changes: 54 additions & 7 deletions agent/worker/agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class AgentOptionsJsonFields:
NET_CLIENT_DOCKER_IMAGE = "dockerImage"
NET_SERVER_PORT = "netServerPort"
DOCKER_IMAGE = "dockerImage"
FORCE_CPU_ONLY = "forceCPUOnly"
LOG_LEVEL = "logLevel"


def create_img_meta_str(img_size_bytes, width, height):
Expand Down Expand Up @@ -584,7 +586,7 @@ def get_agent_options(server_address=None, token=None, timeout=60) -> dict:

url = constants.PUBLIC_API_SERVER_ADDRESS() + "agents.options.info"
resp = requests.post(url=url, json={"token": token}, timeout=timeout)
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
try:
text = resp.text
except:
Expand All @@ -601,7 +603,7 @@ def get_instance_version(server_address=None, timeout=60):
server_address = constants.SERVER_ADDRESS()
url = constants.PUBLIC_API_SERVER_ADDRESS() + "instance.version"
resp = requests.get(url=url, timeout=timeout)
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
if resp.status_code != requests.codes.ok: # pylint: disable=no-member
if resp.status_code in (400, 401, 403, 404):
return None
try:
Expand Down Expand Up @@ -699,9 +701,6 @@ def update_env_param(name, value, default=None):
)
update_env_param(constants._HTTPS_PROXY, http_proxy, optional_defaults[constants._HTTPS_PROXY])
update_env_param(constants._NO_PROXY, no_proxy, optional_defaults[constants._NO_PROXY])
# DOCKER_IMAGE
# maybe_update_env_param(constants._DOCKER_IMAGE, options.get(AgentOptionsJsonFields.DOCKER_IMAGE, None))

update_env_param(
constants._NET_CLIENT_DOCKER_IMAGE,
net_options.get(AgentOptionsJsonFields.NET_CLIENT_DOCKER_IMAGE, None),
Expand All @@ -715,6 +714,16 @@ def update_env_param(name, value, default=None):
update_env_param(
constants._DOCKER_IMAGE, options.get(AgentOptionsJsonFields.DOCKER_IMAGE, None)
)
update_env_param(
constants._FORCE_CPU_ONLY,
str(options.get(AgentOptionsJsonFields.FORCE_CPU_ONLY, False)).lower(),
optional_defaults[constants._NET_SERVER_PORT],
)
update_env_param(
constants._LOG_LEVEL,
options.get(AgentOptionsJsonFields.LOG_LEVEL, None),
optional_defaults[constants._LOG_LEVEL],
)

agent_host_dir = options.get(AgentOptionsJsonFields.AGENT_HOST_DIR, "").strip()
if agent_host_dir == "":
Expand Down Expand Up @@ -782,6 +791,7 @@ def _volumes_changes(volumes) -> dict:
changes[key] = value
return changes


def _is_bind_attached(container_info, bind_path):
vols = binds_to_volumes_dict(container_info.get("HostConfig", {}).get("Binds", []))

Expand All @@ -791,15 +801,17 @@ def _is_bind_attached(container_info, bind_path):

return False


def _copy_file_to_container(container, src, dst_dir: str):
stream = io.BytesIO()
with tarfile.open(fileobj=stream, mode='w|') as tar, open(src, 'rb') as f:
with tarfile.open(fileobj=stream, mode="w|") as tar, open(src, "rb") as f:
info = tar.gettarinfo(fileobj=f)
info.name = os.path.basename(src)
tar.addfile(info, f)

container.put_archive(dst_dir, stream.getvalue())


def _ca_cert_changed(ca_cert) -> str:
if ca_cert is None:
return None
Expand Down Expand Up @@ -832,7 +844,12 @@ def _ca_cert_changed(ca_cert) -> str:
tmp_container = docker_api.containers.create(
agent_image,
"",
volumes={constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME(): {"bind": constants.SLY_EXTRA_CA_CERTS_DIR(), "mode": "rw"}},
volumes={
constants.SLY_EXTRA_CA_CERTS_VOLUME_NAME(): {
"bind": constants.SLY_EXTRA_CA_CERTS_DIR(),
"mode": "rw",
}
},
)

_copy_file_to_container(tmp_container, cert_path, constants.SLY_EXTRA_CA_CERTS_DIR())
Expand Down Expand Up @@ -958,3 +975,33 @@ def restart_agent(
"Docker container is spawned",
extra={"container_id": container.id, "container_name": container.name},
)


def nvidia_runtime_is_available():
docker_api = docker.from_env()
image = constants.DEFAULT_APP_DOCKER_IMAGE()
try:
docker_api.containers.run(
image,
command="nvidia-smi",
runtime="nvidia",
remove=True,
)
return True
except Exception as e:
return False


def maybe_update_runtime():
container_info = get_container_info()
runtime = container_info["HostConfig"]["Runtime"]
if runtime == "nvidia":
return runtime
sly.logger.debug("NVIDIA runtime is not enabled. Checking if it can be enabled...")
is_available = nvidia_runtime_is_available()
if is_available:
sly.logger.debug("NVIDIA runtime is available. Will restart Agent with NVIDIA runtime.")
return "nvidia"
else:
sly.logger.debug("NVIDIA runtime is not available.")
return runtime
19 changes: 17 additions & 2 deletions agent/worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import supervisely_lib as sly
import hashlib
import re
from supervisely_lib.io.docker_utils import PullPolicy # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.docker_utils import (
PullPolicy,
) # pylint: disable=import-error, no-name-in-module


_SERVER_ADDRESS = "SERVER_ADDRESS"
Expand Down Expand Up @@ -99,6 +101,8 @@ def TOKEN():
_UPDATE_SLY_NET_AFTER_RESTART = "UPDATE_SLY_NET_AFTER_RESTART"
_DOCKER_IMAGE = "DOCKER_IMAGE"
_CONTAINER_NAME = "CONTAINER_NAME"
_FORCE_CPU_ONLY = "FORCE_CPU_ONLY"
_LOG_LEVEL = "LOG_LEVEL"

_NET_CLIENT_DOCKER_IMAGE = "NET_CLIENT_DOCKER_IMAGE"
_NET_SERVER_PORT = "NET_SERVER_PORT"
Expand Down Expand Up @@ -163,6 +167,8 @@ def TOKEN():
_AGENT_RESTART_COUNT: 0,
_SLY_EXTRA_CA_CERTS_DIR: "/sly_certs",
_SLY_EXTRA_CA_CERTS_VOLUME_NAME: f"supervisely-agent-ca-certs-{TOKEN()[:8]}",
_FORCE_CPU_ONLY: "false",
_LOG_LEVEL: "INFO",
}


Expand Down Expand Up @@ -261,7 +267,7 @@ def AGENT_TASKS_DIR():

def AGENT_TASK_SHARED_DIR():
"""default: /sly_agent/tasks/task_shared"""
return os.path.join(AGENT_TASKS_DIR(), sly.task.paths.TASK_SHARED) # pylint: disable=no-member
return os.path.join(AGENT_TASKS_DIR(), sly.task.paths.TASK_SHARED) # pylint: disable=no-member


def AGENT_TMP_DIR():
Expand Down Expand Up @@ -658,6 +664,7 @@ def AGENT_RESTART_COUNT():
def SLY_EXTRA_CA_CERTS_DIR():
return read_optional_setting(_SLY_EXTRA_CA_CERTS_DIR)


def SLY_EXTRA_CA_CERTS_FILEPATH():
return os.path.join(SLY_EXTRA_CA_CERTS_DIR(), "instance_ca_chain.crt")

Expand All @@ -666,6 +673,14 @@ def SLY_EXTRA_CA_CERTS_VOLUME_NAME():
return read_optional_setting(_SLY_EXTRA_CA_CERTS_VOLUME_NAME)


def FORCE_CPU_ONLY():
return sly.env.flag_from_env(read_optional_setting(_FORCE_CPU_ONLY))


def LOG_LEVEL():
return read_optional_setting(_LOG_LEVEL)


def init_constants():
sly.fs.mkdir(AGENT_LOG_DIR())
sly.fs.mkdir(AGENT_TASKS_DIR())
Expand Down
29 changes: 19 additions & 10 deletions agent/worker/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@

import supervisely_lib as sly
from .task_dockerized import TaskDockerized
from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import flatten_json, modify_keys # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import SUPERVISELY_TASK_ID # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import Api # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.fs import ( # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import (
dump_json_file,
) # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import (
flatten_json,
modify_keys,
) # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import (
SUPERVISELY_TASK_ID,
) # pylint: disable=import-error, no-name-in-module
from supervisely_lib.api.api import Api # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.fs import ( # pylint: disable=import-error, no-name-in-module
ensure_base_path,
silent_remove,
get_file_name,
Expand All @@ -34,7 +41,9 @@
file_exists,
mkdir,
)
from supervisely_lib.io.exception_handlers import handle_exceptions # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.exception_handlers import (
handle_exceptions,
) # pylint: disable=import-error, no-name-in-module

from worker import constants
from worker.agent_utils import (
Expand Down Expand Up @@ -217,7 +226,7 @@ def init_docker_image(self):
gpu = GPUFlag.from_config(self.app_config)
self.docker_runtime = "runc"

if gpu is not GPUFlag.skipped:
if gpu is not GPUFlag.skipped and not constants.FORCE_CPU_ONLY():
docker_info = self._docker_api.info()
nvidia = "nvidia"
nvidia_available = False
Expand Down Expand Up @@ -285,9 +294,9 @@ def read_dockerimage_from_config(self):
f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}",
)
)
self.info[
"docker_image"
] = f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}"
self.info["docker_image"] = (
f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{self.info['docker_image']}"
)

def is_isolate(self):
if self.app_config is None:
Expand Down
28 changes: 18 additions & 10 deletions agent/worker/task_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,48 @@
import supervisely_lib as sly
import os

from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import dump_json_file # pylint: disable=import-error, no-name-in-module
from supervisely_lib.task.paths import TaskPaths # pylint: disable=import-error, no-name-in-module
from supervisely_lib.io.json import (
dump_json_file,
) # pylint: disable=import-error, no-name-in-module

from worker.task_dockerized import TaskDockerized, TaskStep
from worker import constants


class TaskCustom(TaskDockerized):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.action_map = {}
self.docker_runtime = 'nvidia'
if not constants.FORCE_CPU_ONLY():
self.docker_runtime = "nvidia"
else:
self.docker_runtime = "runc"

self.dir_data = os.path.join(self.dir_task, os.path.basename(TaskPaths.DATA_DIR))
self.dir_results = os.path.join(self.dir_task, os.path.basename(TaskPaths.RESULTS_DIR))
self.dir_model = os.path.join(self.dir_task, os.path.basename(TaskPaths.MODEL_DIR))
self.config_path1 = os.path.join(self.dir_task, os.path.basename(TaskPaths.SETTINGS_PATH))
self.config_path2 = os.path.join(self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH))
self.config_path2 = os.path.join(
self.dir_task, os.path.basename(TaskPaths.TASK_CONFIG_PATH)
)

def init_additional(self):
super().init_additional()
sly.fs.mkdir(self.dir_data)
sly.fs.mkdir(self.dir_results)

def download_step(self):
for model_info in self.info['models']:
self.data_mgr.download_nn(model_info['title'], self.dir_model)
for model_info in self.info["models"]:
self.data_mgr.download_nn(model_info["title"], self.dir_model)

self.logger.info("DOWNLOAD_DATA")
dump_json_file(self.info['config'], self.config_path1) # Deprecated 'task_settings.json'
dump_json_file(self.info['config'], self.config_path2) # New style task_config.json
dump_json_file(self.info["config"], self.config_path1) # Deprecated 'task_settings.json'
dump_json_file(self.info["config"], self.config_path2) # New style task_config.json

for pr_info in self.info['projects']:
self.data_mgr.download_project(self.dir_data, pr_info['title'])
for pr_info in self.info["projects"]:
self.data_mgr.download_project(self.dir_data, pr_info["title"])

self.report_step_done(TaskStep.DOWNLOAD)

Expand Down
Loading

0 comments on commit 30b6ab3

Please sign in to comment.