Skip to content

Commit

Permalink
docker pull rework
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolaiPetukhov committed Oct 29, 2024
1 parent 0d19c2b commit 50302af
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 117 deletions.
47 changes: 8 additions & 39 deletions agent/worker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def __init__(self):
self.thread_pool = ThreadPoolExecutor(max_workers=10)
self.thread_list = []
self.daemons_list = []
self.docker_auths = {}

self._remove_old_agent()
self._validate_duplicated_agents()
Expand All @@ -85,7 +84,6 @@ def __init__(self):
self.docker_api = docker.from_env(
version="auto", timeout=constants.DOCKER_API_CALL_TIMEOUT()
)
self._docker_login()

self.logger.info("Agent is ready to get tasks.")
self.api = sly.AgentAPI(
Expand Down Expand Up @@ -373,9 +371,7 @@ def start_task(self, task):
break

if need_skip is False:
self.task_pool[task_id] = create_task(
task, self.docker_api, docker_auths=self.docker_auths
)
self.task_pool[task_id] = create_task(task, self.docker_api)
self.task_pool[task_id].start()
else:
self.logger.warning(
Expand Down Expand Up @@ -454,14 +450,6 @@ def _stop_missed_containers(self, ecosystem_token):
},
)

def _docker_login(self):
doc_logs = constants.DOCKER_LOGIN().split(",")
doc_pasws = constants.DOCKER_PASSWORD().split(",")
doc_regs = constants.DOCKER_REGISTRY().split(",")
for login, pasw, reg in zip(doc_logs, doc_pasws, doc_regs):
self.docker_auths.update({reg: {"username": login, "password": pasw}})
agent_utils.docker_login(self.docker_api, self.logger) # TODO: shoild delete?

def submit_log(self):
while True:
log_lines = self.log_queue.get_log_batch_nowait()
Expand Down Expand Up @@ -665,32 +653,13 @@ def update_base_layers(self):
)
image = f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{image}"

try:
docker_utils.docker_pull_if_needed(
self.docker_api,
image,
policy=docker_utils.PullPolicy.ALWAYS,
logger=self.logger,
progress=False,
docker_auths=self.docker_auths,
)
except DockerException as e:
if "no basic auth credentials" in str(e).lower():
self.logger.warn(
f"Failed to pull docker image '{image}'. Will try to login and pull again",
exc_info=True,
)
self._docker_login()
docker_utils.docker_pull_if_needed(
self.docker_api,
image,
policy=docker_utils.PullPolicy.ALWAYS,
logger=self.logger,
progress=False,
docker_auths=self.docker_auths,
)
else:
raise e
docker_utils.docker_pull_if_needed(
self.docker_api,
image,
policy=docker_utils.PullPolicy.ALWAYS,
logger=self.logger,
progress=False,
)

self.logger.info(f"Docker image '{image}' has been pulled successfully")
pulled.append(image)
Expand Down
17 changes: 0 additions & 17 deletions agent/worker/agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,20 +1107,3 @@ def maybe_update_runtime():
def convert_millicores_to_cpu_quota(millicores, cpu_period=100000):
cpu_quota = (millicores / 1000) * cpu_period
return int(cpu_quota)


def docker_login(docker_api, logger):
doc_logs = constants.DOCKER_LOGIN().split(",")
doc_pasws = constants.DOCKER_PASSWORD().split(",")
doc_regs = constants.DOCKER_REGISTRY().split(",")

for login, password, registry in zip(doc_logs, doc_pasws, doc_regs):
if registry:
try:
doc_login = docker_api.login(username=login, password=password, registry=registry)
logger.info(
"DOCKER_CLIENT_LOGIN_SUCCESS", extra={**doc_login, "registry": registry}
)
except Exception as e:
if not constants.OFFLINE_MODE():
raise e
53 changes: 34 additions & 19 deletions agent/worker/docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@

import json
from enum import Enum
from typing import Optional
import time
from typing import Dict, Optional

from supervisely.app import DialogWindowError
from supervisely.task.progress import Progress

from worker import constants

PULL_RETRIES = 1

PULL_RETRIES = 5
PULL_RETRY_DELAY = 5


class PullPolicy(Enum):
Expand Down Expand Up @@ -45,9 +49,22 @@ def from_str(cls, status: Optional[str]) -> PullStatus:
return dct.get(status, PullStatus.OTHER)


def docker_pull_if_needed(
docker_api, docker_image_name, policy, logger, progress=True, docker_auths=None
):
def _auths_from_env() -> Dict:
doc_logs = constants.DOCKER_LOGIN().split(",")
doc_pasws = constants.DOCKER_PASSWORD().split(",")
doc_regs = constants.DOCKER_REGISTRY().split(",")
auths = {}
for login, pasw, reg in zip(doc_logs, doc_pasws, doc_regs):
auths.update({reg: {"username": login, "password": pasw}})
return auths


def _registry_auth_from_env(registry: str) -> Dict:
auths = _auths_from_env()
return auths.get(registry, None)


def docker_pull_if_needed(docker_api, docker_image_name, policy, logger, progress=True):
logger.info(
"docker_pull_if_needed args",
extra={
Expand All @@ -61,35 +78,31 @@ def docker_pull_if_needed(
)
if str(policy) == str(PullPolicy.ALWAYS):
if progress is False:
_docker_pull(docker_api, docker_image_name, logger, docker_auths=docker_auths)
_docker_pull(docker_api, docker_image_name, logger)
else:
_docker_pull_progress(docker_api, docker_image_name, logger, docker_auths=docker_auths)
_docker_pull_progress(docker_api, docker_image_name, logger)
elif str(policy) == str(PullPolicy.NEVER):
pass
elif str(policy) == str(PullPolicy.IF_NOT_PRESENT):
if not _docker_image_exists(docker_api, docker_image_name):
if progress is False:
_docker_pull(docker_api, docker_image_name, logger, docker_auths=docker_auths)
_docker_pull(docker_api, docker_image_name, logger)
else:
_docker_pull_progress(
docker_api, docker_image_name, logger, docker_auths=docker_auths
)
_docker_pull_progress(docker_api, docker_image_name, logger)
elif str(policy) == str(PullPolicy.IF_AVAILABLE):
if progress is False:
_docker_pull(
docker_api,
docker_image_name,
logger,
raise_exception=True,
docker_auths=docker_auths,
)
else:
_docker_pull_progress(
docker_api,
docker_image_name,
logger,
raise_exception=True,
docker_auths=docker_auths,
)
else:
raise RuntimeError(f"Unknown pull policy {str(policy)}")
Expand All @@ -115,14 +128,14 @@ def resolve_registry(docker_image_name):
return None


def _docker_pull(docker_api, docker_image_name, logger, raise_exception=True, docker_auths=None):
def _docker_pull(docker_api, docker_image_name, logger, raise_exception=True):
from docker.errors import DockerException

logger.info("Docker image will be pulled", extra={"image_name": docker_image_name})
registry = resolve_registry(docker_image_name)
if docker_auths is None:
docker_auths = {}
auth = docker_auths.get(registry, None)
auth = _registry_auth_from_env(registry)
for i in range(0, PULL_RETRIES + 1):
progress_dummy = Progress(
"Pulling image..." + f" (retry {i}/{PULL_RETRIES})" if i > 0 else "",
Expand Down Expand Up @@ -152,18 +165,18 @@ def _docker_pull(docker_api, docker_image_name, logger, raise_exception=True, do
)
return
logger.warning("Unable to pull image: %s", str(e))
logger.info("Retrying in %d seconds...", PULL_RETRY_DELAY)
time.sleep(PULL_RETRY_DELAY)


def _docker_pull_progress(
docker_api, docker_image_name, logger, raise_exception=True, docker_auths=None
):
def _docker_pull_progress(docker_api, docker_image_name, logger, raise_exception=True):
logger.info("Docker image will be pulled", extra={"image_name": docker_image_name})
from docker.errors import DockerException

registry = resolve_registry(docker_image_name)
if docker_auths is None:
docker_auths = {}
auth = docker_auths.get(registry, None)
auth = _registry_auth_from_env(registry)
for i in range(0, PULL_RETRIES + 1):
try:
layers_total_load = {}
Expand Down Expand Up @@ -261,6 +274,8 @@ def _docker_pull_progress(
)
return
logger.warning("Unable to pull image: %s", str(e))
logger.info("Retrying in %d seconds...", PULL_RETRY_DELAY)
time.sleep(PULL_RETRY_DELAY)


def _docker_image_exists(docker_api, docker_image_name):
Expand Down
33 changes: 9 additions & 24 deletions agent/worker/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,28 +525,13 @@ def sync_pip_cache(self):
def find_or_run_container(self):
add_labels = {"sly_app": "1", "app_session_id": str(self.info["task_id"])}
self._docker_api.api.pull(auth_config={"username": "", "password": ""})
try:
docker_utils.docker_pull_if_needed(
self._docker_api,
self.docker_image_name,
constants.PULL_POLICY(),
self.logger,
)
except DockerException as e:
if "no basic auth credentials" in str(e).lower():
self.logger.warn(
f"Failed to pull docker image '{self.docker_image_name}'. Will try to login and pull again",
exc_info=True,
)
agent_utils.docker_login(self.docker_api, self.logger)
docker_utils.docker_pull_if_needed(
self._docker_api,
self.docker_image_name,
constants.PULL_POLICY(),
self.logger,
)
else:
raise e
docker_utils.docker_pull_if_needed(
self._docker_api,
self.docker_image_name,
constants.PULL_POLICY(),
self.logger,
)

self.sync_pip_cache()
if self._container is None:
try:
Expand Down Expand Up @@ -601,9 +586,9 @@ def find_or_run_container(self):
self.logger.info("pip second install for old agents is finished")

def get_spawn_entrypoint(self):
inf_command = "while true; do sleep 30; done;"
inf_command = " while true; do sleep 30; done;"
self.logger.info("Infinite command", extra={"command": inf_command})
return ["sh", "-c", inf_command]
return ["/usr/bin/timeout", timeout, "sh", "-c", inf_command]

def _exec_command(self, command, add_envs=None, container_id=None):
add_envs = sly.take_with_default(add_envs, {})
Expand Down
3 changes: 1 addition & 2 deletions agent/worker/task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
}


def create_task(task_msg, docker_api: DockerClient, docker_auths: Dict = None) -> TaskSly:
def create_task(task_msg, docker_api: DockerClient) -> TaskSly:
task_id = task_msg.get("task_id", None)
task_type = get_run_mode(docker_api, task_msg)
task_cls = _task_class_mapping.get(task_type, None)
Expand All @@ -60,7 +60,6 @@ def create_task(task_msg, docker_api: DockerClient, docker_auths: Dict = None) -
task_obj = task_cls(task_msg)
if issubclass(task_cls, TaskDockerized) or (task_msg["task_type"] == "update_agent"):
task_obj.docker_api = docker_api
task_obj.docker_auths = docker_auths
return task_obj


Expand Down
20 changes: 4 additions & 16 deletions agent/worker/task_pull_docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,10 @@ def docker_api(self, val):

def task_main_func(self):
self.logger.info("TASK_START", extra={"event_type": sly.EventType.TASK_STARTED})
try:
docker_utils.docker_pull_if_needed(
self._docker_api, self.docker_image_name, self.info["pull_policy"], self.logger
)
except DockerException as e:
if "no basic auth credentials" in str(e).lower():
self.logger.warn(
f"Failed to pull docker image '{self.docker_image_name}'. Will try to login and pull again",
exc_info=True,
)
agent_utils.docker_login(self.docker_api, self.logger)
docker_utils.docker_pull_if_needed(
self._docker_api, self.docker_image_name, self.info["pull_policy"], self.logger
)
else:
raise e
docker_utils.docker_pull_if_needed(
self._docker_api, self.docker_image_name, self.info["pull_policy"], self.logger
)

docker_img = self._docker_api.images.get(self.docker_image_name)
if constants.CHECK_VERSION_COMPATIBILITY():
self._validate_version(
Expand Down

0 comments on commit 50302af

Please sign in to comment.