diff --git a/agent/worker/agent.py b/agent/worker/agent.py index b2da744..7fdde64 100644 --- a/agent/worker/agent.py +++ b/agent/worker/agent.py @@ -253,6 +253,7 @@ def agent_connect_initially(self): ), # for debug "agent_version": config.get("Labels", {}).get("VERSION", "agent:6.999.0"), # for debug "agent_image_digest": get_self_docker_image_digest(), + "server_address": constants.SERVER_ADDRESS(), "environ": { constants._SUPERVISELY_AGENT_FILES: constants.SUPERVISELY_AGENT_FILES(), constants._DOCKER_NET: constants.DOCKER_NET(), diff --git a/agent/worker/agent_utils.py b/agent/worker/agent_utils.py index ba95503..6723c43 100644 --- a/agent/worker/agent_utils.py +++ b/agent/worker/agent_utils.py @@ -48,6 +48,7 @@ class AgentOptionsJsonFields: PULL_POLICY = "pullPolicy" SUPERVISELY_AGENT_FILES = "slyAppsDataHostDir" NO_PROXY = "noProxy" + CPU_LIMIT = "cpuLimit" MEM_LIMIT = "memLimit" HTTP_PROXY = "httpProxy" SECURITY_OPT = "securityOpts" @@ -702,6 +703,11 @@ def update_env_param(name, value, default=None): options.get(AgentOptionsJsonFields.PULL_POLICY, None), optional_defaults[constants._PULL_POLICY], ) + update_env_param( + constants._CPU_LIMIT, + options.get(AgentOptionsJsonFields.CPU_LIMIT, None), + optional_defaults[constants._CPU_LIMIT], + ) update_env_param( constants._MEM_LIMIT, options.get(AgentOptionsJsonFields.MEM_LIMIT, None), @@ -1046,3 +1052,8 @@ def maybe_update_runtime(): else: sly.logger.debug("NVIDIA runtime is not available.") return runtime + + +def convert_millicores_to_cpu_quota(millicores, cpu_period=100000): + cpu_quota = (millicores / 1000) * cpu_period + return int(cpu_quota) diff --git a/agent/worker/constants.py b/agent/worker/constants.py index a03623d..fbc9e53 100644 --- a/agent/worker/constants.py +++ b/agent/worker/constants.py @@ -54,8 +54,7 @@ def TOKEN(): _SLY_EXTRA_CA_CERTS = "SLY_EXTRA_CA_CERTS" # container limits -_CPU_PERIOD = "CPU_PERIOD" -_CPU_QUOTA = "CPU_QUOTA" +_CPU_LIMIT = "CPU_LIMIT" _MEM_LIMIT = "MEM_LIMIT" _SHM_SIZE = "SHM_SIZE" @@ -126,8 +125,7 @@ def TOKEN(): _HTTPS_PROXY: "", _NO_PROXY: "", _PUBLIC_API_RETRY_LIMIT: 100, - _CPU_PERIOD: None, - _CPU_QUOTA: None, + _CPU_LIMIT: None, _MEM_LIMIT: None, _PULL_POLICY: str(PullPolicy.IF_AVAILABLE), # str(PullPolicy.NEVER), _GIT_LOGIN: None, @@ -382,18 +380,10 @@ def PUBLIC_API_RETRY_LIMIT(): return int(read_optional_setting(_PUBLIC_API_RETRY_LIMIT)) -def CPU_PERIOD(): - val = read_optional_setting(_CPU_PERIOD) - if val is None: - return val - else: - return int(val) - - -def CPU_QUOTA(): - val = read_optional_setting(_CPU_QUOTA) - if val is None: - return val +def CPU_LIMIT(): + val = read_optional_setting(_CPU_LIMIT) + if val is None or val.strip() == "": + return None else: return int(val) diff --git a/agent/worker/task_app.py b/agent/worker/task_app.py index 7ed583b..82ae356 100644 --- a/agent/worker/task_app.py +++ b/agent/worker/task_app.py @@ -105,6 +105,7 @@ def __init__(self, *args, **kwargs): self._need_sync_pip_cache = False self._requirements_path_relative = None self.host_data_dir = None + self.tmp_data_dir = None self.data_dir = None self.agent_id = None self._gpu_config: Optional[GPUFlag] = None @@ -303,6 +304,17 @@ def is_isolate(self): raise RuntimeError("App config is not initialized") return True # self.app_config.get(_ISOLATE, True) + def clean_task_dir(self): + super().clean_task_dir() + + tmp_data_dir = os.path.join( + constants.SUPERVISELY_AGENT_FILES_CONTAINER(), + "app_tmp_data", str(self.info["task_id"]) + ) + + if sly.fs.dir_exists(tmp_data_dir): + remove_dir(tmp_data_dir) + def _get_task_volumes(self): res = {} res[self.dir_task_host] = {"bind": self.dir_task_container, "mode": "rw"} @@ -394,6 +406,30 @@ def _get_task_volumes(self): "mode": mode, } + useTmpFromFiles = self.info.get("useTmpFromFiles", False) + + if useTmpFromFiles is True: + relative_app_tmp_data_dir = os.path.join( + "app_tmp_data", str(self.info["task_id"]) + ) + + host_tmp_data_dir = os.path.join( + constants.SUPERVISELY_AGENT_FILES(), + relative_app_tmp_data_dir, + ) + + self.tmp_data_dir = os.path.join( + constants.SUPERVISELY_AGENT_FILES_CONTAINER(), + relative_app_tmp_data_dir, + ) + + res[host_tmp_data_dir] = {"bind": "/tmp", "mode": "rw"} + + if sly.fs.dir_exists(self.tmp_data_dir): + remove_dir(self.tmp_data_dir) + + mkdir(self.tmp_data_dir) + return res def download_step(self): @@ -636,7 +672,14 @@ def main_step(self): self.find_or_run_container() self.exec_command(add_envs=self.main_step_envs()) self.process_logs() - self.drop_container_and_check_status() + + try: + self.drop_container_and_check_status() + except: + if self.tmp_data_dir is not None and sly.fs.dir_exists(self.tmp_data_dir): + remove_dir(self.tmp_data_dir) + raise + # if exit_code != 0 next code will never execute if self.data_dir is not None and sly.fs.dir_exists(self.data_dir): parent_app_dir = Path(self.data_dir).parent diff --git a/agent/worker/task_dockerized.py b/agent/worker/task_dockerized.py index bfb8ea1..7847d41 100644 --- a/agent/worker/task_dockerized.py +++ b/agent/worker/task_dockerized.py @@ -19,6 +19,7 @@ filter_log_line, pip_req_satisfied_filter, post_get_request_filter, + convert_millicores_to_cpu_quota, ) from worker.task_sly import TaskSly @@ -226,6 +227,17 @@ def spawn_container(self, add_envs=None, add_labels=None, entrypoint_func=None): self._container_name = "sly_task_{}_{}".format( self.info["task_id"], constants.TASKS_DOCKER_LABEL() ) + + cpu_quota = self.info.get("limits", {}).get("cpu", None) + if cpu_quota is None: + cpu_quota = constants.CPU_LIMIT() + if cpu_quota is not None: + cpu_quota = convert_millicores_to_cpu_quota(cpu_quota) + + mem_limit = self.info.get("limits", {}).get("memory", None) + if mem_limit is None: + mem_limit = constants.MEM_LIMIT() + self._container = self._docker_api.containers.run( self.docker_image_name, runtime=self.docker_runtime, @@ -244,10 +256,9 @@ def spawn_container(self, add_envs=None, add_labels=None, entrypoint_func=None): shm_size=constants.SHM_SIZE(), stdin_open=False, tty=False, - cpu_period=constants.CPU_PERIOD(), - cpu_quota=constants.CPU_QUOTA(), - mem_limit=constants.MEM_LIMIT(), - memswap_limit=constants.MEM_LIMIT(), + cpu_quota=cpu_quota, + mem_limit=mem_limit, + memswap_limit=mem_limit, network=constants.DOCKER_NET(), ipc_mode=ipc_mode, security_opt=constants.SECURITY_OPT(),