From 4b8a49e6866b181f7ee663b75073cec89d404e2a Mon Sep 17 00:00:00 2001 From: TheoLisin Date: Tue, 28 Nov 2023 22:16:30 +0000 Subject: [PATCH] cross agent lock via filelock --- Dockerfile | 1 + agent/worker/agent_utils.py | 30 +++++++++++++++++------------- agent/worker/task_dockerized.py | 29 +++++++++++++++++------------ requirements.txt | 1 + 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/Dockerfile b/Dockerfile index 568c7a6..850a2be 100644 --- a/Dockerfile +++ b/Dockerfile @@ -56,6 +56,7 @@ RUN pip install nvidia-ml-py==12.535.77 ############### copy code ############### #COPY supervisely_lib /workdir/supervisely_lib RUN pip install httpx +RUN pip install filelock==3.13.1 RUN pip install requests-toolbelt>=1.0.0 RUN pip install torch==1.7.1+cu110 torchvision==0.8.2+cu110 -f https://download.pytorch.org/whl/torch_stable.html diff --git a/agent/worker/agent_utils.py b/agent/worker/agent_utils.py index 9ea1523..1c59653 100644 --- a/agent/worker/agent_utils.py +++ b/agent/worker/agent_utils.py @@ -12,6 +12,7 @@ from logging import Logger from pathlib import Path from typing import Callable, Container, List, Optional, Union +from filelock import FileLock import supervisely_lib as sly from worker import constants @@ -351,24 +352,27 @@ def _parse_all_hists(self, hist_paths: List[str]) -> List[str]: return list(to_remove.keys()) def _parse_and_update_history(self, hist_path: str, to_remove: dict): - with open(hist_path, "r") as json_file: - images_data: dict = json.load(json_file) + hist_lock = FileLock(f"{hist_path}.lock") - rest_images = {} - for image, last_date in images_data.items(): - if self._is_outdated(last_date): - to_remove[image] = last_date - else: - rest_images[image] = last_date - if image in to_remove: - del to_remove[image] + with hist_lock: + with open(hist_path, "r") as json_file: + images_data: dict = json.load(json_file) + + rest_images = {} + for image, last_date in images_data.items(): + if self._is_outdated(last_date): + to_remove[image] = last_date + else: + rest_images[image] = last_date + if image in to_remove: + del to_remove[image] - with open(hist_path, "w") as json_file: - json.dump(rest_images, json_file, indent=4) + with open(hist_path, "w") as json_file: + json.dump(rest_images, json_file, indent=4) return to_remove - def _is_outdated(self, last_date: Union[str, datetime]): + def _is_outdated(self, last_date: Union[str, datetime]) -> bool: last_date_ts = last_date if isinstance(last_date, str): last_date_ts = datetime.strptime(last_date, "%Y-%m-%dT%H:%M") diff --git a/agent/worker/task_dockerized.py b/agent/worker/task_dockerized.py index 732a302..d3252c7 100644 --- a/agent/worker/task_dockerized.py +++ b/agent/worker/task_dockerized.py @@ -9,9 +9,9 @@ from pathlib import Path from threading import Lock from typing import Callable, List, Optional -from multiprocessing import Lock as ProcessLock import docker +from filelock import FileLock import supervisely_lib as sly from worker import constants from worker.agent_utils import ( @@ -68,15 +68,23 @@ def __init__(self, *args, **kwargs): self._container_name = None self._task_reports: List[ErrorReport] = [] self._log_filters = [pip_req_satisfied_filter] # post_get_request_filter - self._process_lock = ProcessLock() + + self._history_file = None + if constants.CROSS_AGENT_TMP_DIR() is not None: + self._history_file = os.path.join( + constants.CROSS_AGENT_TMP_DIR(), f"docker-images-history-{constants.TOKEN()}.json" + ) + self._history_file_lock = FileLock(f"{self._history_file}.lock") def init_docker_image(self): self.docker_image_name = self.info.get("docker_image", None) if self.docker_image_name is not None and ":" not in self.docker_image_name: self.docker_image_name += ":latest" - + if constants.CROSS_AGENT_TMP_DIR() is None: - self.logger.debug("CROSS_AGENT_TMP_DIR has not been set; the process of removing unused Docker will not be executed") + self.logger.debug( + "CROSS_AGENT_TMP_DIR has not been set; the process of removing unused Docker will not be executed" + ) else: self.update_image_history(self.docker_image_name) @@ -365,19 +373,16 @@ def _process_report(self, log_msg: str): pass def update_image_history(self, image_name): - log_folder = constants.CROSS_AGENT_TMP_DIR() - - history_file = os.path.join(log_folder, f"docker-images-history-{constants.TOKEN()}.json") cur_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M") - - with self._process_lock: - if sly.fs.file_exists(history_file): - with open(history_file, "r") as json_file: + + with self._history_file_lock: + if sly.fs.file_exists(self._history_file): + with open(self._history_file, "r") as json_file: images_stat = json.load(json_file) else: images_stat = {} images_stat[image_name] = cur_date - with open(history_file, "w") as json_file: + with open(self._history_file, "w") as json_file: json.dump(images_stat, json_file, indent=4) diff --git a/requirements.txt b/requirements.txt index 2ae69f6..0924415 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ requests==2.28.1 urllib3==1.26.15 # torch==1.7.1 nvidia-ml-py==12.535.77 +filelock==3.13.1