From 0154685f5248e8b9da303f4ed79bc3ab9d0190b2 Mon Sep 17 00:00:00 2001 From: TheoLisin <87002239+TheoLisin@users.noreply.github.com> Date: Wed, 13 Sep 2023 15:25:57 +0300 Subject: [PATCH] Exception logging and cache cleaning (#34) * devcontainer settings example * devcontainer example * upd sfk * upd sdk * devcontainer comments * added report parser to handle DialogWindowError * debug msg * process DialogWindowError * fix unmutable mutations bug * added agent exceptions handler and docker pull exceptions * rm import * change exception code * change host mkfile to agent * launch configurations for devcontainer * change paths from host to mounted agent paths * added nvsmi info: capability, cuda, driver * updates for development in devcontainer * added comments and all app folders * all folders size + comments * add paths descriptions * not reusable app data removal * auto clean constants * app sessions and agent logs clean by age functions * added AppDirCleaner for session/pip files remove * fix description * added cleaning for terminated tasks * files and folders separation * comments * files and folders separation * added autocleaner task * prevent raising exception in thread; fix bug * added new clean tasks * rm clean pip cache from app session cleaner * pytest config; test autocleaner * base test functions updated * upd device capability * just my code * rename weights * fix import * change log condition * age_limit typing * AppDirCleaner inplace initialization * rename node storage * fix test path * files removal for not existing in app_sessions * node_storage nodes naming * added removing of unknow sessions files * check unknown sessions * fix ValueError bug * upd auto clean range with env * upd sdk * upd sdk * upd docs * upd constant docs * upd sdk * prevent autoclean app files * allow manual clean for users (update apps once a day) * upd sdk * mv docker exceptions to sdk * fix comment * add comment * upd sdk --- .devcontainer/Dockerfile | 32 +++ .devcontainer/create-net.sh | 101 ++++++++ .devcontainer/devcontainer.json | 33 +++ .vscode/launch.json | 26 +++ Dockerfile | 4 +- agent/worker/agent.py | 39 +++- agent/worker/agent_utils.py | 260 +++++++++++++++++++-- agent/worker/constants.py | 35 +++ agent/worker/system_info.py | 13 +- agent/worker/task_app.py | 28 ++- agent/worker/task_clean_node.py | 113 ++++++--- agent/worker/task_dockerized.py | 35 +++ agent/worker/telemetry_reporter.py | 37 ++- pytest.ini | 4 + requirements.txt | 8 +- tests/clean_functions/conftest.py | 110 +++++++++ tests/clean_functions/test_autoclean.py | 132 +++++++++++ tests/clean_functions/test_client_clean.py | 96 ++++++++ 18 files changed, 1034 insertions(+), 72 deletions(-) create mode 100644 .devcontainer/Dockerfile create mode 100644 .devcontainer/create-net.sh create mode 100644 .devcontainer/devcontainer.json create mode 100644 pytest.ini create mode 100644 tests/clean_functions/conftest.py create mode 100644 tests/clean_functions/test_autoclean.py create mode 100644 tests/clean_functions/test_client_clean.py diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..ba3dcdb --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,32 @@ +FROM supervisely/agent:latest + +# use same name in devcontainer.json +ARG USERNAME=fedor +# execute `echo $UID` on host +ARG USER_UID=1003 +ARG USER_GID=$USER_UID +# execute `getent group docker` on host +ARG DOCKER_UID=999 + +RUN groupadd --gid $USER_GID $USERNAME \ + && useradd --uid $USER_UID --gid $USER_GID -m $USERNAME \ + # + # [Optional] Add sudo support. Omit if you don't need to install software after connecting. + && apt-get update \ + && apt-get install -y sudo \ + && echo $USERNAME ALL=\(root\) NOPASSWD:ALL > /etc/sudoers.d/$USERNAME \ + && chmod 0440 /etc/sudoers.d/$USERNAME + +RUN groupadd -g $DOCKER_UID docker \ + && usermod -aG docker $USERNAME + +# AGENT_ROOT_DIR +RUN mkdir /sly_agent +# SUPERVISELY_AGENT_FILES_CONTAINER +RUN mkdir -p /app/sly-files +# uncomment and uninstall to debug sdk +# RUN pip3 uninstall -y supervisely + +# if no User defined all files changed in devcontainer will have root as owner +USER $USERNAME +ENTRYPOINT ["/bin/bash"] \ No newline at end of file diff --git a/.devcontainer/create-net.sh b/.devcontainer/create-net.sh new file mode 100644 index 0000000..324dffc --- /dev/null +++ b/.devcontainer/create-net.sh @@ -0,0 +1,101 @@ + +set -o pipefail + +WHITE='\033[1;37m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +NC='\033[0m' + +command_exists() { + command -v "$@" > /dev/null 2>&1 +} + +sudo_cmd=() + +if command_exists sudo; then + sudo_cmd=(sudo -E bash -c) +elif command_exists su; then + sudo_cmd=(su -p -s bash -c) +fi + +docker ps > /dev/null 2>&1 + +access_test_code=$? + +if [[ ${access_test_code} -ne 0 && ${EUID} -ne 0 && ${#sudo_cmd} -ne 0 ]]; then + cur_fd="$(printf %q "$BASH_SOURCE")$((($#)) && printf ' %q' "$@")" + cur_script=$(cat "${cur_fd}") + ${sudo_cmd[*]} "${cur_script}" + + exit 0 +fi + +export SUPERVISELY_AGENT_IMAGE='supervisely/agent:dev' +# same as in devcontainer.json and debug.env +export AGENT_HOST_DIR="/home/fedor_lisin/agent_debug_dir/agent" + +# from secret.env ↓ +export ACCESS_TOKEN='' +export SERVER_ADDRESS='' +export DOCKER_REGISTRY='' +export DOCKER_LOGIN='' +export DOCKER_PASSWORD='' +# from secret.env ↑ + +secrets=("${ACCESS_TOKEN}" "${SERVER_ADDRESS}" "${DOCKER_REGISTRY}" "${DOCKER_LOGIN}" "${DOCKER_PASSWORD}") + +for value in "${secrets[@]}" +do + if [ -z $value ]; + then + echo "${RED}One of the required secrets is not defined${NC}" + exit 1 + fi +done + + +export DELETE_TASK_DIR_ON_FINISH='true' +export DELETE_TASK_DIR_ON_FAILURE='true' +export PULL_POLICY='ifnotpresent' +# same as in devcontainer.json and debug.env +export SUPERVISELY_AGENT_FILES=$(echo -n "/home/fedor_lisin/agent_debug_dir/files") + + +echo 'Supervisely Net is enabled, starting client...' +docker pull supervisely/sly-net-client:latest +docker network create "supervisely-net-${ACCESS_TOKEN}" 2> /dev/null +echo 'Remove existing Net Client container if any...' +docker rm -fv $(docker ps -aq -f name="supervisely-net-client-${ACCESS_TOKEN}") 2> /dev/null +docker run -it -d --name="supervisely-net-client-${ACCESS_TOKEN}" \ + -e "SLY_NET_CLIENT_PING_INTERVAL=60" \ + --privileged \ + --network "supervisely-net-${ACCESS_TOKEN}" \ + --restart=unless-stopped \ + --log-driver=local \ + --log-opt max-size=1m \ + --log-opt max-file=1 \ + --log-opt compress=false \ + --cap-add NET_ADMIN \ + --device /dev/net/tun:/dev/net/tun \ + \ + \ + \ + \ + \ + \ + -v /var/run/docker.sock:/tmp/docker.sock:ro \ + -v "${AGENT_HOST_DIR}:/app/sly" \ + \ + -v "/home/fedor_lisin/agent_debug_dir/files:/app/sly-files" \ + "supervisely/sly-net-client:latest" \ + "${ACCESS_TOKEN}" \ + "https://dev.supervisely.com/net/" \ + "dev.supervisely.com:51822" + +retVal=$? +if [ $retVal -ne 0 ]; then + echo -e " +${RED}Couldn't start Supervisely Net. Agent is running fine. Please, contact support and attach the log above${NC}" +fi + +echo -e "${WHITE}============ You can close this terminal safely now ============${NC}" \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..e797458 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,33 @@ +{ + // "image": "agent_dev:latest", + "build": { + "dockerfile": "Dockerfile" + }, + "runArgs": [ + "--gpus", + "all", + "--ipc=host", + "--net=host", + "--cap-add", + "NET_ADMIN" + ], + "containerEnv": { + // access token from secret.env + "DOCKER_NET": "supervisely-net-{access token}" + }, + "mounts": [ + "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind", + // AGENT_HOST_DIR and AGENT_ROOT_DIR in env file; AGENT_ROOT_DIR should be created in Dockerfile + "source=/home/fedor_lisin/agent_debug_dir/agent,target=/sly_agent,type=bind", + // SUPERVISELY_AGENT_FILES and SUPERVISELY_AGENT_FILES_CONTAINER in env file; SUPERVISELY_AGENT_FILES_CONTAINER should be created in Dockerfile + "source=/home/fedor_lisin/agent_debug_dir/files,target=/app/sly-files,type=bind" + ], + "remoteUser": "fedor", + "customizations": { + "vscode": { + "extensions": [ + "ms-python.python" + ] + } + } +} \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 8d4aa25..85601ba 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -16,6 +16,32 @@ "env": { "PYTHONPATH": "${workspaceFolder}:${PYTHONPATH}" } + }, + { + "name": "devcontainer main.py", + "type": "python", + "request": "launch", + "program": "agent/main.py", + "console": "integratedTerminal", + "justMyCode": false, + "env": { + "PYTHONPATH": "${workspaceFolder}:${PYTHONPATH}" + }, + // app session creates __pycache__ after task ending; + // __pycache__ owner is root, so agent need to be launched with sudo setting + // inside devcontainer otherwise it crashes on task stop + "sudo": true + }, + { + "name": "Python: Debug Tests", + "type": "python", + "request": "launch", + "program": "${file}", + "purpose": [ + "debug-test" + ], + "console": "integratedTerminal", + "justMyCode": false } ] } \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 52b94e9..5e9f6dc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -60,7 +60,9 @@ RUN pip install requests-toolbelt>=1.0.0 RUN pip install torch==1.7.1+cu110 torchvision==0.8.2+cu110 -f https://download.pytorch.org/whl/torch_stable.html -RUN pip install supervisely==6.72.71 +RUN pip install supervisely==6.72.127 +# for development +# RUN pip install git+https://github.com/supervisely/supervisely.git@minor-improvements COPY . /workdir diff --git a/agent/worker/agent.py b/agent/worker/agent.py index 5348c3e..47a01c8 100644 --- a/agent/worker/agent.py +++ b/agent/worker/agent.py @@ -4,12 +4,16 @@ import docker import json import threading -from concurrent.futures import ThreadPoolExecutor, wait import subprocess import os -import supervisely_lib as sly import uuid import warnings +from concurrent.futures import ThreadPoolExecutor, wait +from pathlib import Path + +import supervisely_lib as sly + +from worker.agent_utils import TaskDirCleaner, AppDirCleaner warnings.filterwarnings(action="ignore", category=UserWarning) @@ -191,6 +195,12 @@ def stop_task(self, task_id): "Task could not be stopped. Not found", extra={"task_id": task_id} ) + dir_task = str(Path(constants.AGENT_APP_SESSIONS_DIR()) / str(task_id)) + if os.path.exists(dir_task): + cleaner = TaskDirCleaner(dir_task) + cleaner.allow_cleaning() + cleaner.clean() + self.logger.info( "TASK_MISSED", extra={ @@ -353,9 +363,7 @@ def follow_daemon(self, process_cls, name, sleep_sec=5): self.api.simple_request( "UpdateTelemetry", sly.api_proto.Empty, - sly.api_proto.AgentInfo( - info=json.dumps({"gpu_info": gpu_info}) - ), + sly.api_proto.AgentInfo(info=json.dumps({"gpu_info": gpu_info})), ) last_gpu_message = GPU_FREQ @@ -389,6 +397,11 @@ def inf_loop(self): sly.function_wrapper_external_logger, self.send_connect_info, self.logger ) ) + self.thread_list.append( + self.thread_pool.submit( + sly.function_wrapper_external_logger, self.task_clear_old_data, self.logger + ) + ) if constants.DISABLE_TELEMETRY() is None: self.thread_list.append( self.thread_pool.submit( @@ -429,3 +442,19 @@ def terminate_all_deamons(): if len(futures_statuses.not_done) != 0: raise RuntimeError("AGENT: EXCEPTION IN BASE FUTURE !!!") + + def task_clear_old_data(self): + day = 60 * 60 * 24 + cleaner = AppDirCleaner(self.logger) + while True: + with self.task_pool_lock: + all_tasks = set(self.task_pool.keys()) + + try: + cleaner.auto_clean(all_tasks) + except Exception as e: + self.logger.exception(e) + # raise or not? + # raise e + + time.sleep(day) diff --git a/agent/worker/agent_utils.py b/agent/worker/agent_utils.py index 9a36c97..f3cb617 100644 --- a/agent/worker/agent_utils.py +++ b/agent/worker/agent_utils.py @@ -6,17 +6,23 @@ import queue import json +import supervisely_lib as sly + +from logging import Logger +from typing import List, Optional, Union, Container +from datetime import datetime, timedelta +from pathlib import Path from worker import constants def create_img_meta_str(img_size_bytes, width, height): - img_meta = {'size': img_size_bytes, 'width': width, 'height': height} + img_meta = {"size": img_size_bytes, "width": width, "height": height} res = json.dumps(img_meta) return res def ann_special_fields(): - return 'img_hash', 'img_ext', 'img_size_bytes' + return "img_hash", "img_ext", "img_size_bytes" # multithreading @@ -51,10 +57,10 @@ def get_log_batch_blocking(self): class TaskDirCleaner: def __init__(self, dir_task): self.dir_task = dir_task - self.marker_fpath = osp.join(self.dir_task, '__do_not_clean.marker') + self.marker_fpath = osp.join(self.dir_task, "__do_not_clean.marker") def _clean(self): - for elem in filter(lambda x: 'logs' not in x, os.listdir(self.dir_task)): + for elem in filter(lambda x: "logs" not in x, os.listdir(self.dir_task)): del_path = osp.join(self.dir_task, elem) if osp.isfile(del_path): os.remove(del_path) @@ -62,7 +68,7 @@ def _clean(self): shutil.rmtree(del_path) def forbid_dir_cleaning(self): - with open(self.marker_fpath, 'a'): + with open(self.marker_fpath, "a"): os.utime(self.marker_fpath, None) # touch file def allow_cleaning(self): @@ -80,28 +86,254 @@ def clean_forced(self): self._clean() -#@TODO: remove this method or refactor it in future (dict_name - WTF??) +class AppDirCleaner: + def __init__(self, logger: Logger) -> None: + self.logger = logger + + def clean_agent_logs(self): + root_path = Path(constants.AGENT_LOG_DIR()) + removed = [] + + old_logs = self._get_old_files_or_folders(root_path, only_files=True) + for log_path in old_logs: + sly.fs.silent_remove(log_path) + removed.append(log_path) + + self.logger.info(f"Removed agent logs: {removed}") + + def clean_app_sessions( + self, + auto=False, + working_apps: Optional[Container[int]] = None, + ) -> List[str]: + """Delete sessions logs and repository clones of finished/crashed apps""" + root_path = Path(constants.AGENT_APP_SESSIONS_DIR()) + cleaned_sessions = [] + + if auto is True: + old_apps = self._get_old_files_or_folders(root_path, only_folders=True) + else: + # get all files and folders + old_apps = self._get_old_files_or_folders( + root_path, only_folders=True, age_limit=timedelta(days=0) + ) + + for app in old_apps: + app_path = Path(app) + app_id = app_path.name + + if not os.path.exists(app_path / "__do_not_clean.marker"): + cleaned_sessions.append(app_id) + sly.fs.remove_dir(app) + continue + + if self._check_task_id_finished_or_crashed(app_id, working_apps): + cleaned_sessions.append(app_id) + sly.fs.remove_dir(app) + + self.logger.info(f"Removed sessions: {cleaned_sessions}") + + return cleaned_sessions + + def clean_app_files(self, cleaned_sessions: List[str]): + """Delete files, artifacts used in finished/crashed apps""" + if constants.SUPERVISELY_SYNCED_APP_DATA_CONTAINER() is not None: + root_path = Path(constants.SUPERVISELY_SYNCED_APP_DATA_CONTAINER()) + known_sessions = os.listdir(constants.AGENT_APP_SESSIONS_DIR()) + else: + return + + for app_name in os.listdir(root_path): + app_path = root_path / app_name + if os.path.isdir(app_path): + for task_id in os.listdir(app_path): + task_path = app_path / task_id + + to_del = False + if task_id in cleaned_sessions: + to_del = True + elif task_id not in known_sessions: + to_del = True + + if to_del and os.path.isdir(task_path): + sly.fs.remove_dir(task_path) + + if sly.fs.dir_empty(app_path): + sly.fs.remove_dir(app_path) + + def clean_pip_cache(self, auto=False): + root_path = Path(constants.APPS_PIP_CACHE_DIR()) + removed = [] + + for module_id in os.listdir(root_path): + module_caches_path = root_path / module_id + + if auto is True: + old_cache = self._get_old_files_or_folders(module_caches_path, only_folders=True) + else: + old_cache = self._get_old_files_or_folders( + module_caches_path, only_folders=True, age_limit=timedelta(days=0) + ) + + for ver_path in old_cache: + removed.append(ver_path) + sly.fs.remove_dir(ver_path) + + if sly.fs.dir_empty(module_caches_path): + sly.fs.remove_dir(module_caches_path) + + self.logger.info(f"Removed PIP cache: {removed}") + + def clean_git_tags(self): + # TODO: add conditions? + root_path = Path(constants.APPS_STORAGE_DIR()) + sly.fs.remove_dir(str(root_path / "github.com")) + + def auto_clean(self, working_apps: Container[int]): + self.logger.info("Auto cleaning task started.") + self._allow_manual_cleaning_if_not_launched(working_apps) + self._apps_cleaner(working_apps, auto=True) + self.clean_agent_logs() + + def clean_all_app_data(self, working_apps: Optional[Container[int]] = None): + self.logger.info("Cleaning apps data.") + self._apps_cleaner(working_apps, auto=False, clean_pip=False) + self.clean_git_tags() + + def _apps_cleaner( + self, + working_apps: Optional[Container[int]], + auto: bool = False, + clean_pip: bool = True, + ): + cleaned_sessions = self.clean_app_sessions(auto=auto, working_apps=working_apps) + if auto is False: + self.clean_app_files(cleaned_sessions) + if clean_pip is True: + self.clean_pip_cache(auto=auto) + + def _get_log_datetime(self, log_name) -> datetime: + return datetime.strptime(log_name, "log_%Y-%m-%d_%H:%M:%S.txt") + + def _get_file_or_path_datetime(self, path: Union[Path, str]) -> datetime: + time_sec = max(os.path.getmtime(path), os.path.getatime(path)) + return datetime.fromtimestamp(time_sec) + + def _get_old_files_or_folders( + self, + parent_path: Union[Path, str], + only_files: bool = False, + only_folders: bool = False, + age_limit: Union[timedelta, int] = constants.AUTO_CLEAN_TIMEDELTA_DAYS(), + ) -> List[str]: + """Return abs path for folders/files which last modification/access + datetime is greater than constants.AUTO_CLEAN_TIMEDELTA_DAYS (default: 7); + use `AUTO_CLEAN_INT_RANGE_DAYS` env var to setup. + + :param parent_path: path to serach + :type parent_path: Union[Path, str] + :param only_files: return will content only files paths, defaults to False + :type only_files: bool, optional + :param only_folders: return will content only folders paths, defaults to False + :type only_folders: bool, optional + :param age_limit: max age of file or folder. + If `type(age_limit)` is int, will convert to `timedelta(day=age_limit)`; + defaults to constants.AUTO_CLEAN_TIMEDELTA_DAYS + :type age_limit: timedelta, optional + :raises ValueError: `only_files` and `only_folders` can't be True simultaneously + :return: list of absolute paths + :rtype: List[str] + """ + if (only_files and only_folders) is True: + raise ValueError("only_files and only_folders can't be True simultaneously.") + + if isinstance(age_limit, int): + age_limit = timedelta(days=age_limit) + + now = datetime.now() + ppath = Path(parent_path) + old_path_files = [] + for file_or_path in os.listdir(parent_path): + full_path = ppath / file_or_path + + if only_files and os.path.isdir(full_path): + continue + elif only_folders and os.path.isfile(full_path): + continue + + file_datetime = self._get_file_or_path_datetime(full_path) + if (now - file_datetime) > age_limit: + old_path_files.append(str(full_path)) + + return old_path_files + + def _check_task_id_finished_or_crashed( + self, + task_id: Union[str, int], + working_apps: Optional[Container[int]], + ) -> bool: + try: + task_id = int(task_id) + except ValueError as exc: + self.logger.exception(exc) + return False + + if working_apps is None: + return False + + return task_id not in working_apps + + def _allow_manual_cleaning_if_not_launched(self, working_apps: Container[int]): + root_path = Path(constants.AGENT_APP_SESSIONS_DIR()) + allow_for_cleaner = [] + for task_id in os.listdir(root_path): + allow = False + try: + if int(task_id) not in working_apps: + allow = True + except ValueError: + pass + + dir_task = str(root_path / task_id) + if allow and os.path.exists(dir_task): + allow_for_cleaner.append(task_id) + cleaner = TaskDirCleaner(dir_task) + cleaner.allow_cleaning() + cleaner.clean() + + if len(allow_for_cleaner) > 0: + self.logger.info(f"Files for this session can be manually removed: {allow_for_cleaner}") + + +# @TODO: remove this method or refactor it in future (dict_name - WTF??) def get_single_item_or_die(src_dict, key, dict_name): results = src_dict.get(key, None) if results is None: raise ValueError( - 'No values were found for {} in {}. A list with exactly one item is required.'.format(key, dict_name)) + "No values were found for {} in {}. A list with exactly one item is required.".format( + key, dict_name + ) + ) if len(results) != 1: raise ValueError( - 'Multiple values ({}) were found for {} in {}. A list with exactly one item is required.'.format( - len(results), key, dict_name)) + "Multiple values ({}) were found for {} in {}. A list with exactly one item is required.".format( + len(results), key, dict_name + ) + ) return results[0] def add_creds_to_git_url(git_url): old_str = None - if 'https://' in git_url: - old_str = 'https://' - elif 'http://' in git_url: - old_str = 'http://' + if "https://" in git_url: + old_str = "https://" + elif "http://" in git_url: + old_str = "http://" res = git_url if constants.GIT_LOGIN() is not None and constants.GIT_PASSWORD() is not None: - res = git_url.replace(old_str, '{}{}:{}@'.format(old_str, constants.GIT_LOGIN(), constants.GIT_PASSWORD())) + res = git_url.replace( + old_str, "{}{}:{}@".format(old_str, constants.GIT_LOGIN(), constants.GIT_PASSWORD()) + ) return res else: return git_url diff --git a/agent/worker/constants.py b/agent/worker/constants.py index 757b540..df371ff 100644 --- a/agent/worker/constants.py +++ b/agent/worker/constants.py @@ -1,6 +1,7 @@ # coding: utf-8 import os +from datetime import timedelta from urllib.parse import urlparse import supervisely_lib as sly import hashlib @@ -48,6 +49,7 @@ _DISABLE_TELEMETRY = "DISABLE_TELEMETRY" _DEFAULT_APP_DOCKER_IMAGE = "DEFAULT_APP_DOCKER_IMAGE" _AGENT_FILES_IN_APP_CONTAINER = "AGENT_FILES_IN_APP_CONTAINER" +_AUTO_CLEAN_INT_RANGE_DAYS = "AUTO_CLEAN_INT_RANGE_DAYS" _REQUIRED_SETTINGS = [ @@ -106,6 +108,7 @@ _OFFLINE_MODE: False, _DEFAULT_APP_DOCKER_IMAGE: "supervisely/base-py-sdk", _AGENT_FILES_IN_APP_CONTAINER: "/agent-storage", + _AUTO_CLEAN_INT_RANGE_DAYS: 7, } @@ -122,10 +125,12 @@ def read_optional_setting(name): def HOST_DIR(): + """{agent root host dir}; default '~/.supervisely-agent'""" return os.environ[_AGENT_HOST_DIR] def AGENT_ROOT_DIR(): + """{agent root dir}; default '/sly_agent'""" return read_optional_setting(_AGENT_ROOT_DIR) @@ -167,6 +172,7 @@ def DOCKER_REGISTRY(): def AGENT_TASKS_DIR_HOST(): + """default: '~/.supervisely-agent/tasks""" return os.path.join(HOST_DIR(), "tasks") @@ -187,26 +193,32 @@ def DOCKER_API_CALL_TIMEOUT(): def AGENT_LOG_DIR(): + """default: /sly_agent/logs""" return os.path.join(AGENT_ROOT_DIR(), "logs") def AGENT_TASKS_DIR(): + """default: /sly_agent/tasks""" return os.path.join(AGENT_ROOT_DIR(), "tasks") def AGENT_TASK_SHARED_DIR(): + """default: /sly_agent/tasks/task_shared""" return os.path.join(AGENT_TASKS_DIR(), sly.task.paths.TASK_SHARED) def AGENT_TMP_DIR(): + """default: /sly_agent/tmp""" return os.path.join(AGENT_ROOT_DIR(), "tmp") def AGENT_IMPORT_DIR(): + """default: /sly_agent/import""" return os.path.join(AGENT_ROOT_DIR(), "import") def AGENT_STORAGE_DIR(): + """default: /sly_agent/storage""" return os.path.join(AGENT_ROOT_DIR(), "storage") @@ -239,6 +251,14 @@ def TIMEOUT_CONFIG_PATH(): return None if use_default_timeouts else "/workdir/src/configs/timeouts_for_stateless.json" +def AUTO_CLEAN_INT_RANGE_DAYS(): + return int(read_optional_setting(_AUTO_CLEAN_INT_RANGE_DAYS)) + + +def AUTO_CLEAN_TIMEDELTA_DAYS() -> timedelta: + return timedelta(days=AUTO_CLEAN_INT_RANGE_DAYS()) + + def NETW_CHUNK_SIZE(): return 1048576 @@ -337,26 +357,36 @@ def GIT_PASSWORD(): def AGENT_APP_SESSIONS_DIR(): + """default: /sly_agent/app_sessions""" return os.path.join(AGENT_ROOT_DIR(), "app_sessions") def AGENT_APP_SESSIONS_DIR_HOST(): + """default: ~/.supervisely-agent/app_sessions""" return os.path.join(HOST_DIR(), "app_sessions") def AGENT_APPS_CACHE_DIR_HOST(): + """default: ~/.supervisely-agent/apps_cache""" return os.path.join(HOST_DIR(), "apps_cache") +def AGENT_APPS_CACHE_DIR(): + """default: /sly_agent/apps_cache""" + return os.path.join(AGENT_ROOT_DIR(), "apps_cache") + + def GITHUB_TOKEN(): return read_optional_setting(_GITHUB_TOKEN) def APPS_STORAGE_DIR(): + """default: /sly_agent/storage/apps""" return os.path.join(AGENT_STORAGE_DIR(), "apps") def APPS_PIP_CACHE_DIR(): + """default: /sly_agent/storage/apps_pip_cache""" return os.path.join(AGENT_STORAGE_DIR(), "apps_pip_cache") @@ -404,10 +434,12 @@ def AGENT_ID(): def SUPERVISELY_AGENT_FILES(): # /root/supervisely/agent-17 (host) -> /app/sly-files (net-client) # /root/supervisely/agent-17 (host) -> /app/sly-files (agent container) + """{agent files host dir}; default `~/supervisely/agent-###`""" return read_optional_setting(_SUPERVISELY_AGENT_FILES) def SUPERVISELY_AGENT_FILES_CONTAINER(): + """{agent files dir}; default /app/sly-files""" host_dir = SUPERVISELY_AGENT_FILES() if host_dir is None: return None @@ -416,6 +448,7 @@ def SUPERVISELY_AGENT_FILES_CONTAINER(): def AGENT_FILES_IN_APP_CONTAINER(): + """/agent-storage""" host_dir = SUPERVISELY_AGENT_FILES() if host_dir is None: return None @@ -424,6 +457,7 @@ def AGENT_FILES_IN_APP_CONTAINER(): def SUPERVISELY_SYNCED_APP_DATA(): + """default: ~/supervisely/agent-###/app_data""" agent_storage_dir = SUPERVISELY_AGENT_FILES() if agent_storage_dir is None: return None @@ -431,6 +465,7 @@ def SUPERVISELY_SYNCED_APP_DATA(): def SUPERVISELY_SYNCED_APP_DATA_CONTAINER(): + """default: /app/sly-files/app_data""" dir_in_container = SUPERVISELY_AGENT_FILES_CONTAINER() if dir_in_container is None: return None diff --git a/agent/worker/system_info.py b/agent/worker/system_info.py index 5398e9d..302ff81 100644 --- a/agent/worker/system_info.py +++ b/agent/worker/system_info.py @@ -232,9 +232,18 @@ def get_gpu_info(logger): gpu_info["device_count"] = smi.nvmlDeviceGetCount() gpu_info["device_names"] = [] gpu_info["device_memory"] = [] + gpu_info["device_capability"] = [] for idx in range(gpu_info["device_count"]): handle = smi.nvmlDeviceGetHandleByIndex(idx) + capability = smi.nvmlDeviceGetCudaComputeCapability(handle) + capability = "{major}.{minor}".format(major=capability[0], minor=capability[1]) gpu_info["device_names"].append(smi.nvmlDeviceGetName(handle)) + gpu_info["device_capability"].append( + { + "device": f"GPU {idx}", + "compute_capability": capability, + } + ) try: device_props = smi.nvmlDeviceGetMemoryInfo(handle) mem = { @@ -247,8 +256,10 @@ def get_gpu_info(logger): mem = {} finally: gpu_info["device_memory"].append(mem) + gpu_info["driver_version"] = smi.nvmlSystemGetDriverVersion() + gpu_info["cuda_version"] = smi.nvmlSystemGetCudaDriverVersion() smi.nvmlShutdown() except Exception as e: logger.warning(repr(e)) - return gpu_info \ No newline at end of file + return gpu_info diff --git a/agent/worker/task_app.py b/agent/worker/task_app.py index 2afa3b6..b6c4d7e 100644 --- a/agent/worker/task_app.py +++ b/agent/worker/task_app.py @@ -30,6 +30,8 @@ file_exists, mkdir, ) +from supervisely_lib.io.exception_handlers import handle_exceptions + _ISOLATE = "isolate" _LINUX_DEFAULT_PIP_CACHE_DIR = "/root/.cache/pip" @@ -53,6 +55,7 @@ def __init__(self, *args, **kwargs): self._requirements_path_relative = None self.host_data_dir = None self.agent_id = None + super().__init__(*args, **kwargs) def init_logger(self, loglevel=None): @@ -72,7 +75,9 @@ def init_task_dir(self): if team_id == "unknown": self.logger.warn("teamId not found in context") self.dir_apps_cache_host = os.path.join(constants.AGENT_APPS_CACHE_DIR_HOST(), str(team_id)) - sly.fs.ensure_base_path(self.dir_apps_cache_host) + self.dir_apps_cache = os.path.join(constants.AGENT_APPS_CACHE_DIR(), str(team_id)) + + sly.fs.ensure_base_path(self.dir_apps_cache) # task container path # self.dir_task_container = os.path.join("/sessions", str(self.info['task_id'])) @@ -253,6 +258,11 @@ def _get_task_volumes(self): relative_app_data_dir, ) + self.data_dir = os.path.join( + constants.SUPERVISELY_AGENT_FILES_CONTAINER(), relative_app_data_dir + ) + mkdir(self.data_dir) + self.logger.info( "Task host data dir", extra={ @@ -262,7 +272,6 @@ def _get_task_volumes(self): }, ) - mkdir(self.host_data_dir) res[self.host_data_dir] = {"bind": _APP_CONTAINER_DATA_DIR, "mode": "rw"} res[constants.SUPERVISELY_AGENT_FILES()] = { "bind": constants.AGENT_FILES_IN_APP_CONTAINER(), @@ -384,6 +393,7 @@ def sync_pip_cache(self): else: self.logger.info("Use existing pip cache") + @handle_exceptions def find_or_run_container(self): add_labels = {"sly_app": "1", "app_session_id": str(self.info["task_id"])} sly.docker_utils.docker_pull_if_needed( @@ -485,10 +495,10 @@ def main_step(self): self.exec_command(add_envs=self.main_step_envs()) self.process_logs() self.drop_container_and_check_status() - if self.host_data_dir is not None and sly.fs.dir_exists(self.host_data_dir): - if sly.fs.dir_empty(self.host_data_dir): - sly.fs.remove_dir(self.host_data_dir) - parent_app_dir = Path(self.host_data_dir).parent + # if exit_code != 0 next code will never execute + if self.data_dir is not None and sly.fs.dir_exists(self.data_dir): + parent_app_dir = Path(self.data_dir).parent + sly.fs.remove_dir(self.data_dir) if sly.fs.dir_empty(parent_app_dir) and len(sly.fs.get_subdirs(parent_app_dir)) == 0: sly.fs.remove_dir(parent_app_dir) @@ -591,9 +601,11 @@ def _process_line(log_line): else: lvl_int = sly.LOGGING_LEVELS["INFO"].int self.logger.log(lvl_int, msg, extra=res_log) + self._process_report(msg) # @TODO: parse multiline logs correctly (including exceptions) log_line = "" + for log_line_arr in self._logs_output: for log_part in log_line_arr.decode("utf-8").splitlines(): logs_found = True @@ -628,6 +640,10 @@ def drop_container_and_check_status(self): self._drop_container() self.logger.debug("Task container finished with status: {}".format(str(status))) if status != 0: + if len(self._task_reports) > 0: + last_report = self._task_reports[-1].to_dict() + self.logger.debug("Founded error report.", extra=last_report) + raise sly.app.exceptions.DialogWindowError(**last_report) raise RuntimeError( # self.logger.warn( "Task container finished with non-zero status: {}".format(str(status)) diff --git a/agent/worker/task_clean_node.py b/agent/worker/task_clean_node.py index 5a4aebe..5566e37 100644 --- a/agent/worker/task_clean_node.py +++ b/agent/worker/task_clean_node.py @@ -6,7 +6,7 @@ import supervisely_lib as sly from worker.task_sly import TaskSly -from worker.agent_utils import TaskDirCleaner +from worker.agent_utils import TaskDirCleaner, AppDirCleaner from worker import constants @@ -25,23 +25,43 @@ def remove_objects(self, storage, spaths): def remove_images(self, storage, proj_structure): for key, value in proj_structure.items(): - self.logger.info('Clean dataset', extra={'proj_id': value['proj_id'], 'proj_title': value['proj_title'], - 'ds_id': value['ds_id'], 'ds_title': value['ds_title']}) - #spaths = [spath_ext[0] for spath_ext in value['spaths']] - removed = self.remove_objects(storage, value['spaths']) - self.logger.info('Images are removed.', extra={'need_remove_cnt': len(value['spaths']), 'removed_cnt': len(removed)}) + self.logger.info( + "Clean dataset", + extra={ + "proj_id": value["proj_id"], + "proj_title": value["proj_title"], + "ds_id": value["ds_id"], + "ds_title": value["ds_title"], + }, + ) + # spaths = [spath_ext[0] for spath_ext in value['spaths']] + removed = self.remove_objects(storage, value["spaths"]) + self.logger.info( + "Images are removed.", + extra={"need_remove_cnt": len(value["spaths"]), "removed_cnt": len(removed)}, + ) def remove_weights(self, storage, paths): removed = self.remove_objects(storage, paths) - self.logger.info('Weights are removed.', extra={'need_remove_cnt': len(paths), 'removed_cnt': len(removed)}) + self.logger.info( + "Weights are removed.", + extra={"need_remove_cnt": len(paths), "removed_cnt": len(removed)}, + ) def get_dataset_images_hashes(self, dataset_id): - image_array = self.api.simple_request('GetDatasetImages', sly.api_proto.ImageArray, sly.api_proto.Id(id=dataset_id)) + image_array = self.api.simple_request( + "GetDatasetImages", sly.api_proto.ImageArray, sly.api_proto.Id(id=dataset_id) + ) img_hashes = [] - for batch_img_ids in sly.batched(list(image_array.images), constants.BATCH_SIZE_GET_IMAGES_INFO()): - images_info_proto = self.api.simple_request('GetImagesInfo', sly.api_proto.ImagesInfo, - sly.api_proto.ImageArray(images=batch_img_ids)) + for batch_img_ids in sly.batched( + list(image_array.images), constants.BATCH_SIZE_GET_IMAGES_INFO() + ): + images_info_proto = self.api.simple_request( + "GetImagesInfo", + sly.api_proto.ImagesInfo, + sly.api_proto.ImageArray(images=batch_img_ids), + ) img_hashes.extend([(info.hash, info.ext) for info in images_info_proto.infos]) return img_hashes @@ -52,67 +72,88 @@ def list_weights_to_remove(self, storage, action, input_weights_hashes): if action == "delete_all_except_selected": selected_paths = set([storage.get_storage_path(hash) for hash in input_weights_hashes]) all_paths = set([path_and_suffix[0] for path_and_suffix in storage.list_objects()]) - paths_to_remove = list(all_paths.difference(selected_paths)) # all_paths - selected_paths + paths_to_remove = list( + all_paths.difference(selected_paths) + ) # all_paths - selected_paths return paths_to_remove - raise ValueError("Unknown cleanup action", extra={'action': action}) + raise ValueError("Unknown cleanup action", extra={"action": action}) def list_images_to_remove(self, storage, action, projects): img_spaths = {} for project in projects: for dataset in project["datasets"]: - ds_id = dataset['id'] + ds_id = dataset["id"] img_spaths[ds_id] = { - 'proj_id': project['id'], - 'proj_title': project['title'], - 'ds_id': ds_id, - 'ds_title': dataset['title'], - 'spaths': [] + "proj_id": project["id"], + "proj_title": project["title"], + "ds_id": ds_id, + "ds_title": dataset["title"], + "spaths": [], } - temp_spaths = [storage.get_storage_path(hash_ext[0], hash_ext[1]) - for hash_ext in self.get_dataset_images_hashes(ds_id)] - img_spaths[ds_id]['spaths'] = temp_spaths + temp_spaths = [ + storage.get_storage_path(hash_ext[0], hash_ext[1]) + for hash_ext in self.get_dataset_images_hashes(ds_id) + ] + img_spaths[ds_id]["spaths"] = temp_spaths if action == "delete_selected": return img_spaths if action == "delete_all_except_selected": - selected_paths=[] + selected_paths = [] for key, value in img_spaths.items(): - selected_paths.extend(value['spaths']) + selected_paths.extend(value["spaths"]) all_paths = set(storage.list_objects()) - paths_to_remove = all_paths.difference(set(selected_paths)) # all_paths - selected_paths + paths_to_remove = all_paths.difference( + set(selected_paths) + ) # all_paths - selected_paths result = {} - result[0] = {'proj_id': -1, 'proj_title': "all cache images", 'ds_id': -1, 'ds_title': "all cache images"} - result[0]['spaths'] = paths_to_remove + result[0] = { + "proj_id": -1, + "proj_title": "all cache images", + "ds_id": -1, + "ds_title": "all cache images", + } + result[0]["spaths"] = paths_to_remove return result - raise ValueError("Unknown cleanup action", extra={'action': action}) + exception = ValueError("Unknown cleanup action") + self.logger.exception(exception, extra={"action": action}) + raise ValueError(f"Unknown cleanup action: {action}") def clean_tasks_dir(self): - self.logger.info('Will remove temporary tasks data.') + self.logger.info("Will remove temporary tasks data.") task_dir = constants.AGENT_TASKS_DIR() task_names = os.listdir(task_dir) for subdir_n in task_names: dir_task = osp.join(task_dir, subdir_n) TaskDirCleaner(dir_task).clean_forced() - self.logger.info('Temporary tasks data has been removed.') + self.logger.info("Temporary tasks data has been removed.") def task_main_func(self): self.logger.info("CLEAN_NODE_START") - if self.info['action'] == "remove_tasks_data": + if self.info["action"] == "remove_tasks_data": self.clean_tasks_dir() - else: - if 'projects' in self.info: + + if "projects" in self.info: img_storage = self.data_mgr.storage.images - proj_structure = self.list_images_to_remove(img_storage, self.info['action'], self.info['projects']) + proj_structure = self.list_images_to_remove( + img_storage, "delete_all_except_selected", self.info["projects"] + ) self.remove_images(img_storage, proj_structure) - if 'weights' in self.info: + if "weights" in self.info: nns_storage = self.data_mgr.storage.nns - weights_to_rm = self.list_weights_to_remove(nns_storage, self.info['action'], self.info['weights']) + weights_to_rm = self.list_weights_to_remove( + nns_storage, "delete_all_except_selected", self.info["weights"] + ) self.remove_weights(nns_storage, weights_to_rm) + elif self.info["action"] == "remove_pip_cache": + AppDirCleaner(self.logger).clean_pip_cache() + elif self.info["action"] == "remove_apps_data": + AppDirCleaner(self.logger).clean_all_app_data() self.logger.info("CLEAN_NODE_FINISH") diff --git a/agent/worker/task_dockerized.py b/agent/worker/task_dockerized.py index 7373cba..f2f9c6a 100644 --- a/agent/worker/task_dockerized.py +++ b/agent/worker/task_dockerized.py @@ -7,6 +7,8 @@ import copy import docker import supervisely_lib as sly +from dataclasses import dataclass +from typing import List, Optional from worker.agent_utils import TaskDirCleaner from worker import constants @@ -20,6 +22,14 @@ class TaskStep(Enum): UPLOAD = 3 +@dataclass +class ErrorReport(object): + title: Optional[str] = None + description: Optional[str] = None + + def to_dict(self) -> dict: + return {"title": self.title, "description": self.description} + # task with main work in separate container and with sequential steps class TaskDockerized(TaskSly): step_name_mapping = { @@ -46,6 +56,7 @@ def __init__(self, *args, **kwargs): self.docker_image_name = None self.docker_pulled = False # in task + self._task_reports: List[ErrorReport] = [] def init_docker_image(self): self.docker_image_name = self.info.get("docker_image", None) @@ -246,6 +257,7 @@ def _drop_container(self): def drop_container_and_check_status(self): status = self._stop_wait_container() if (len(status) > 0) and (status["StatusCode"] not in [0]): # StatusCode must exist + # if len(self._task_reports) > 0: raise RuntimeError( "Task container finished with non-zero status: {}".format(str(status)) ) @@ -285,6 +297,7 @@ def process_logs(self): log_line = log_line.decode("utf-8") msg, res_log, lvl = self.parse_log_line(log_line) output = self.call_event_function(res_log) + self._process_report(msg) lvl_description = sly.LOGGING_LEVELS.get(lvl, None) if lvl_description is not None: @@ -296,3 +309,25 @@ def process_logs(self): if not logs_found: self.logger.warn("No logs obtained from container.") # check if bug occurred + + def _process_report(self, log_msg: str): + err_title, err_desc = None, None + splits = log_msg.split(":") + + if splits[0].endswith("Error title"): + err_title = splits[-1].strip() + if splits[0].endswith("Error message"): + err_desc = splits[-1].strip() + + if err_title is not None: + self._task_reports.append(ErrorReport(title=err_title)) + if err_desc is not None: + try: + last_report = self._task_reports[-1] + if last_report.description is None: + last_report.description = err_desc + else: + self.logger.warn("Last DialogWindowError report was suspicious.") + self.logger.warn("Found message without title.") + except IndexError: + pass diff --git a/agent/worker/telemetry_reporter.py b/agent/worker/telemetry_reporter.py index 30434ab..fc83217 100644 --- a/agent/worker/telemetry_reporter.py +++ b/agent/worker/telemetry_reporter.py @@ -71,14 +71,41 @@ def get_telemetry_str(self): .get("Image", "Unavailable, may be in debug mode") ) + # img_sizeb, nn_sizeb - legacy plugins data; {agent root}/storage/{images|models} img_sizeb = get_directory_size_bytes(self.data_mgr.storage.images.storage_root_path) nn_sizeb = get_directory_size_bytes(self.data_mgr.storage.nns.storage_root_path) + + # apps cache - idk what is it; {agent root}/apps_cache + apps_cache_sizeb = get_directory_size_bytes(constants.AGENT_APPS_CACHE_DIR()) + + # some apps store weights in SUPERVISELY_SYNCED_APP_DATA_CONTAINER; {agent files}/app_data + # after v6.7.23 update, this data is deleted at the end of the task + models_logs_sizeb = get_directory_size_bytes( + constants.SUPERVISELY_SYNCED_APP_DATA_CONTAINER() + ) + + # tasks_sizeb - legacy plugins data; {agent root}/tasks tasks_sizeb = get_directory_size_bytes(constants.AGENT_TASKS_DIR()) + + # app_sessions_sizeb - size of session file: repo (sometimes) and logs (always); {agent root}/app_sessions + app_sessions_sizeb = get_directory_size_bytes(constants.AGENT_APP_SESSIONS_DIR()) + + # cache of app's git tags; {agent root}/storage/apps + git_tags_sizeb = get_directory_size_bytes(constants.APPS_STORAGE_DIR()) + + # pip's cache; {agent root}/storage/apps_pip_cache + pip_cache_sizeb = get_directory_size_bytes(constants.APPS_PIP_CACHE_DIR()) + + legacy_plugins_sizeb = img_sizeb + nn_sizeb + tasks_sizeb + apps_sizeb = git_tags_sizeb + pip_cache_sizeb + app_sessions_sizeb + models_logs_sizeb + + total = legacy_plugins_sizeb + apps_sizeb + node_storage = [ - {"Images": bytes_to_human(img_sizeb)}, - {"NN weights": bytes_to_human(nn_sizeb)}, - {"Tasks": bytes_to_human(tasks_sizeb)}, - {"Total": bytes_to_human(img_sizeb + nn_sizeb + tasks_sizeb)}, + {"App sessions": bytes_to_human(apps_sizeb)}, + {"PIP cache": bytes_to_human(pip_cache_sizeb)}, + {"Plugins ": bytes_to_human(legacy_plugins_sizeb)}, + {"Total": bytes_to_human(total)}, ] server_info = { @@ -95,7 +122,7 @@ def get_telemetry_str(self): def task_main_func(self): try: self.logger.info("TELEMETRY_REPORTER_INITIALIZED") - + # self.logger.debug(f"TELEMETRY REPORT: {self.get_telemetry_str()}") for _ in self.api.get_endless_stream( "GetTelemetryTask", sly.api_proto.Task, sly.api_proto.Empty() ): diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..bfd4e07 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +pythonpath = . agent +testpaths = tests +addopts = --ignore=supervisely-lib-folder --ignore=supervisely \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 9cb2dec..14c8013 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,11 +7,11 @@ grpcio-tools==1.47.0 # py3exiv2==0.9.3 packaging==21.3 version-parser==1.0.1 -supervisely==6.72.59 +supervisely==6.72.127 docker==3.3.0 black python-slugify==6.1.2 requests==2.28.1 -urllib3==1.26.11 -torch==1.7.1 -nvidia-ml-py==12.535.77 \ No newline at end of file +urllib3==1.26.15 +# torch==1.7.1 +nvidia-ml-py==12.535.77 diff --git a/tests/clean_functions/conftest.py b/tests/clean_functions/conftest.py new file mode 100644 index 0000000..7be0d3f --- /dev/null +++ b/tests/clean_functions/conftest.py @@ -0,0 +1,110 @@ +from typing import Tuple +import pytest +import os +from pathlib import Path + +from agent.worker import agent_utils + + +@pytest.fixture(scope="function") +def tmp_path(tmp_path_factory) -> Path: + return tmp_path_factory.mktemp("tmp") + + +@pytest.fixture() +def sly_files_path(tmp_path) -> Path: + """Temporary path for sly-files data""" + tmp_dir: Path = tmp_path / "app" / "sly-files" / "app_data/" + tmp_dir.mkdir(exist_ok=True, parents=True) + return tmp_dir + + +@pytest.fixture() +def sly_agent_path(tmp_path) -> Path: + """Temporary path for sly-files data""" + tmp_dir: Path = tmp_path / "sly_agent" + (tmp_dir / "app_sessions").mkdir(parents=True, exist_ok=True) + storage = tmp_dir / "storage" + (storage / "apps_pip_cache").mkdir(parents=True, exist_ok=True) + (storage / "app").mkdir(parents=True, exist_ok=True) + return tmp_dir + + +@pytest.fixture() +def runned_session(sly_agent_path: Path, sly_files_path: Path): + return run_session(sly_agent_path, sly_files_path)[0] + + +@pytest.fixture() +def stoped_session(sly_agent_path: Path, sly_files_path: Path): + task_id, _, _ = run_session(sly_agent_path, sly_files_path) + app_session = sly_agent_path / "app_sessions" / str(task_id) + agent_utils.TaskDirCleaner(str(app_session)).allow_cleaning() + return task_id + + +@pytest.fixture() +def mocked_paths(sly_agent_path: Path, sly_files_path: Path, monkeypatch): + storage = sly_agent_path / "storage" + AGENT_APP_SESSIONS_DIR = lambda: str(sly_agent_path / "app_sessions") + SUPERVISELY_SYNCED_APP_DATA_CONTAINER = lambda: str(sly_files_path) + AGENT_LOG_DIR = lambda: str(sly_agent_path / "logs") + APPS_PIP_CACHE_DIR = lambda: str(storage / "apps_pip_cache") + APPS_STORAGE_DIR = lambda: str(storage / "apps") + + monkeypatch.setattr(agent_utils.constants, "AGENT_APP_SESSIONS_DIR", AGENT_APP_SESSIONS_DIR) + monkeypatch.setattr( + agent_utils.constants, + "SUPERVISELY_SYNCED_APP_DATA_CONTAINER", + SUPERVISELY_SYNCED_APP_DATA_CONTAINER, + ) + monkeypatch.setattr(agent_utils.constants, "AGENT_LOG_DIR", AGENT_LOG_DIR) + monkeypatch.setattr(agent_utils.constants, "APPS_PIP_CACHE_DIR", APPS_PIP_CACHE_DIR) + monkeypatch.setattr(agent_utils.constants, "APPS_STORAGE_DIR", APPS_STORAGE_DIR) + + +def _generate_id(pth: Path) -> int: + return len(os.listdir(pth)) + 1 + + +def _module_name(sly_files_path: Path) -> Tuple[str, int]: + ind = _generate_id(sly_files_path) + return f"module_{ind}", ind + + +def _mkdir_and_touch(path: Path, filename: str = "randomfile.txt"): + path.mkdir(parents=True, exist_ok=True) + file = path / filename + with open(file, "a"): + os.utime(file, None) + + +def run_session(agent_path: Path, files_path: Path): + task_id = _generate_id(agent_path / "app_sessions") + module, module_id = _module_name(files_path) + + app_session = agent_path / "app_sessions" / str(task_id) + app_logs = app_session / "logs" + app_repo = app_session / "repo" + _mkdir_and_touch(app_logs) + _mkdir_and_touch(app_repo) + agent_utils.TaskDirCleaner(str(app_session)).forbid_dir_cleaning() + + agent_logs = agent_path / "logs" + _mkdir_and_touch(agent_logs, f"logs_{task_id}") + + storage = agent_path / "storage" + pip_cache = storage / "apps_pip_cache" + # module_id = _generate_id(pip_cache) + module_pip_cache = pip_cache / str(module_id) / "v.1" + _mkdir_and_touch(module_pip_cache) + + app_storage = ( + storage / "apps" / "github.com" / "supervisely-ecosystem" / module / str(module_id) / "v.1" + ) + _mkdir_and_touch(app_storage) + + files = files_path / module / str(task_id) / "models" + _mkdir_and_touch(files) + + return task_id, module, module_id diff --git a/tests/clean_functions/test_autoclean.py b/tests/clean_functions/test_autoclean.py new file mode 100644 index 0000000..30a8705 --- /dev/null +++ b/tests/clean_functions/test_autoclean.py @@ -0,0 +1,132 @@ +import os +import shutil +from logging import getLogger +from agent.worker import agent_utils + +from conftest import run_session + + +logger = getLogger() + + +def test_remove_old( + runned_session, + stoped_session, + sly_files_path, + sly_agent_path, + mocked_paths, + monkeypatch, +): + # setup + def mock_age(*args, **kwargs): + return 0 + + storage = sly_agent_path / "storage" + monkeypatch.setattr(agent_utils.os.path, "getatime", mock_age) + monkeypatch.setattr(agent_utils.os.path, "getmtime", mock_age) + + # test body + cleaner = agent_utils.AppDirCleaner(logger) + working_sessions = [runned_session] + cleaner.auto_clean(working_sessions) + + # results + # clean finished + assert os.listdir(sly_agent_path / "app_sessions") == [str(runned_session)] + + # clean cache (it's old enough) + assert len(os.listdir(storage / "apps_pip_cache")) == 0 + + # clean logs (it's old enough) + assert len(os.listdir(sly_agent_path / "logs")) == 0 + + # tags untouched + assert len(os.listdir(storage / "apps" / "github.com" / "supervisely-ecosystem")) == 2 + + +def test_remove_crashed( + runned_session, + stoped_session, + sly_files_path, + sly_agent_path, + mocked_paths, + monkeypatch, +): + # setup + def mock_age(*args, **kwargs): + return 0 + + monkeypatch.setattr(agent_utils.os.path, "getatime", mock_age) + monkeypatch.setattr(agent_utils.os.path, "getmtime", mock_age) + + storage = sly_agent_path / "storage" + + # test body + cleaner = agent_utils.AppDirCleaner(logger) + cleaner.auto_clean(working_apps=[]) + + # results + # clean finished and crashed + assert len(os.listdir(sly_agent_path / "app_sessions")) == 0 + + # clean cache (it's old enough) + assert len(os.listdir(storage / "apps_pip_cache")) == 0 + + # clean logs (it's old enough) + assert len(os.listdir(sly_agent_path / "logs")) == 0 + + # tags untouched + assert len(os.listdir(storage / "apps" / "github.com" / "supervisely-ecosystem")) == 2 + + +def test_not_old_enough( + runned_session, + stoped_session, + sly_files_path, + sly_agent_path, + mocked_paths, +): + # setup + storage = sly_agent_path / "storage" + + # test body + cleaner = agent_utils.AppDirCleaner(logger) + cleaner.auto_clean(working_apps=[]) + + # results + # apps not old enough + assert len(os.listdir(sly_agent_path / "app_sessions")) == 2 + + # cache untouched (it's not old enough) + assert len(os.listdir(storage / "apps_pip_cache")) == 2 + + # logs untouched (it's not old enough) + assert len(os.listdir(sly_agent_path / "logs")) == 2 + + # tags untouched + assert len(os.listdir(storage / "apps" / "github.com" / "supervisely-ecosystem")) == 2 + + +def test_clean_files_for_non_existing_session( + runned_session, + stoped_session, + sly_files_path, + sly_agent_path, + mocked_paths, +): + # setup + session_wo_info, module, _ = run_session(sly_agent_path, sly_files_path) + shutil.rmtree(str(sly_agent_path / "app_sessions" / str(session_wo_info))) + + # test body + cleaner = agent_utils.AppDirCleaner(logger) + cleaner.auto_clean(working_apps=[runned_session]) + + # results + # apps not old enough + assert sorted(os.listdir(sly_agent_path / "app_sessions")) == sorted( + [str(runned_session), str(stoped_session)] + ) + + # session_wo_info - has no lihnked session in app_session + assert len(os.listdir(sly_files_path)) == 2 diff --git a/tests/clean_functions/test_client_clean.py b/tests/clean_functions/test_client_clean.py new file mode 100644 index 0000000..9c04db2 --- /dev/null +++ b/tests/clean_functions/test_client_clean.py @@ -0,0 +1,96 @@ +import os +import shutil +from logging import getLogger +from agent.worker import agent_utils + +from conftest import run_session + + +logger = getLogger() + + +def test_remove_finished( + runned_session, + stoped_session, + sly_files_path, + sly_agent_path, + mocked_paths, +): + # setup + storage = sly_agent_path / "storage" + + # test body + cleaner = agent_utils.AppDirCleaner(logger) + cleaner.clean_all_app_data() + + # results + # clean finished + assert os.listdir(sly_agent_path / "app_sessions") == [str(runned_session)] + + # PIP cache unrouched + assert len(os.listdir(storage / "apps_pip_cache")) == 2 + + # logs cache unrouched + assert len(os.listdir(sly_agent_path / "logs")) == 2 + + # all tags removed + assert os.path.exists(storage / "apps") + assert not os.path.exists(storage / "apps" / "github.com" / "supervisely-ecosystem") + + +def test_remove_pip_cache( + runned_session, + stoped_session, + sly_files_path, + sly_agent_path, + mocked_paths, + monkeypatch, +): + # setup + def mock_age(*args, **kwargs): + return 0 + + monkeypatch.setattr(agent_utils.os.path, "getatime", mock_age) + monkeypatch.setattr(agent_utils.os.path, "getmtime", mock_age) + + storage = sly_agent_path / "storage" + + # test body + cleaner = agent_utils.AppDirCleaner(logger) + cleaner.clean_pip_cache() + + # results + # sessions untouched + assert len(os.listdir(sly_agent_path / "app_sessions")) == 2 + + # all pip cache removed + assert len(os.listdir(storage / "apps_pip_cache")) == 0 + + # logs untouched + assert len(os.listdir(sly_agent_path / "logs")) == 2 + + # tags untouched + assert len(os.listdir(storage / "apps" / "github.com" / "supervisely-ecosystem")) == 2 + + +def test_clean_files_for_non_existing_session( + runned_session, + stoped_session, + sly_files_path, + sly_agent_path, + mocked_paths, +): + # setup + session_wo_info, _, _ = run_session(sly_agent_path, sly_files_path) + shutil.rmtree(str(sly_agent_path / "app_sessions" / str(session_wo_info))) + + # test body + cleaner = agent_utils.AppDirCleaner(logger) + cleaner.clean_all_app_data() + + # results + # apps not old enough + assert os.listdir(sly_agent_path / "app_sessions") == [str(runned_session)] + + # session_wo_info - has no lihnked session in app_session + assert os.listdir(sly_files_path) == [f"module_{runned_session}"]