Skip to content

Commit

Permalink
Add Docker utils to agent (#85)
Browse files Browse the repository at this point in the history
* add docker utils from SDK

* fix docker pull progress

* use agent docker utils instead of SDK

* pylint
  • Loading branch information
NikolaiPetukhov authored Aug 7, 2024
1 parent da959ed commit 8012249
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 25 deletions.
15 changes: 11 additions & 4 deletions agent/worker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from worker import constants
from worker import agent_utils
from worker import docker_utils
from worker.task_sly import TaskSly
from worker.task_factory import create_task, is_task_type
from worker.logs_to_rpc import add_task_handler
Expand All @@ -41,7 +42,11 @@
)
from worker.app_file_streamer import AppFileStreamer
from worker.telemetry_reporter import TelemetryReporter
from supervisely_lib._utils import _remove_sensitive_information # pylint: disable=import-error, no-name-in-module

# pylint: disable=import-error, no-name-in-module
from supervisely_lib._utils import (
_remove_sensitive_information,
)


class Agent:
Expand Down Expand Up @@ -417,7 +422,9 @@ def _forget_task(self, task_id):
@staticmethod
def _remove_containers(label_filter):
dc = docker.from_env()
stop_list = dc.containers.list(all=True, filters=label_filter, sparse=False, ignore_removed=True)
stop_list = dc.containers.list(
all=True, filters=label_filter, sparse=False, ignore_removed=True
)
for cont in stop_list:
cont.remove(force=True)
return stop_list
Expand Down Expand Up @@ -645,10 +652,10 @@ def update_base_layers(self):
)
image = f"{constants.SLY_APPS_DOCKER_REGISTRY()}/{image}"

sly.docker_utils.docker_pull_if_needed(
docker_utils.docker_pull_if_needed(
self.docker_api,
image,
policy=sly.docker_utils.PullPolicy.ALWAYS,
policy=docker_utils.PullPolicy.ALWAYS,
logger=self.logger,
progress=False,
)
Expand Down
4 changes: 1 addition & 3 deletions agent/worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import supervisely_lib as sly
import hashlib
import re
from supervisely_lib.io.docker_utils import ( # pylint: disable=import-error, no-name-in-module
PullPolicy,
)
from worker.docker_utils import PullPolicy


_SERVER_ADDRESS = "SERVER_ADDRESS"
Expand Down
196 changes: 196 additions & 0 deletions agent/worker/docker_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# coding: utf-8
from __future__ import annotations

import json
from enum import Enum
from typing import Optional

from supervisely.app import DialogWindowError
from supervisely.task.progress import Progress


class PullPolicy(Enum):
def __str__(self):
return str(self.value)

ALWAYS = "Always".lower()
IF_AVAILABLE = "IfAvailable".lower()
IF_NOT_PRESENT = "IfNotPresent".lower()
NEVER = "Never".lower()


class PullStatus(Enum):
START = "Pulling fs layer"
DOWNLOAD = "Downloading"
EXTRACT = "Extracting"
COMPLETE_LOAD = "Download complete"
COMPLETE_PULL = "Pull complete"
OTHER = "Other (unknown)"

def is_equal(self, status: str) -> bool:
return status == self.value

@classmethod
def from_str(cls, status: Optional[str]) -> PullStatus:
dct = {
"Pulling fs layer": PullStatus.START,
"Downloading": PullStatus.DOWNLOAD,
"Extracting": PullStatus.EXTRACT,
"Download complete": PullStatus.COMPLETE_LOAD,
"Pull complete": PullStatus.COMPLETE_PULL,
}
return dct.get(status, PullStatus.OTHER)


def docker_pull_if_needed(docker_api, docker_image_name, policy, logger, progress=True):
logger.info(
"docker_pull_if_needed args",
extra={
"policy": policy,
"type(policy)": type(policy),
"policy == PullPolicy.ALWAYS": str(policy) == str(PullPolicy.ALWAYS),
"policy == PullPolicy.NEVER": str(policy) == str(PullPolicy.NEVER),
"policy == PullPolicy.IF_NOT_PRESENT": str(policy) == str(PullPolicy.IF_NOT_PRESENT),
"policy == PullPolicy.IF_AVAILABLE": str(policy) == str(PullPolicy.IF_AVAILABLE),
},
)
if str(policy) == str(PullPolicy.ALWAYS):
if progress is False:
_docker_pull(docker_api, docker_image_name, logger)
else:
_docker_pull_progress(docker_api, docker_image_name, logger)
elif str(policy) == str(PullPolicy.NEVER):
pass
elif str(policy) == str(PullPolicy.IF_NOT_PRESENT):
if not _docker_image_exists(docker_api, docker_image_name):
if progress is False:
_docker_pull(docker_api, docker_image_name, logger)
else:
_docker_pull_progress(docker_api, docker_image_name, logger)
elif str(policy) == str(PullPolicy.IF_AVAILABLE):
if progress is False:
_docker_pull(docker_api, docker_image_name, logger, raise_exception=True)
else:
_docker_pull_progress(docker_api, docker_image_name, logger, raise_exception=True)
else:
raise RuntimeError(f"Unknown pull policy {str(policy)}")
if not _docker_image_exists(docker_api, docker_image_name):
raise DialogWindowError(
title=f"Docker image {docker_image_name} not found. Agent's PULL_POLICY is {str(policy)}.",
description=(
"The initiation of the pulling process was either prevented due to the pull policy settings "
"or it was halted mid-way because the host lacks sufficient disk space."
),
)


def _docker_pull(docker_api, docker_image_name, logger, raise_exception=True):
from docker.errors import DockerException

logger.info("Docker image will be pulled", extra={"image_name": docker_image_name})
progress_dummy = Progress("Pulling image...", 1, ext_logger=logger)
progress_dummy.iter_done_report()
try:
pulled_img = docker_api.images.pull(docker_image_name)
logger.info(
"Docker image has been pulled",
extra={"pulled": {"tags": pulled_img.tags, "id": pulled_img.id}},
)
except DockerException as e:
if raise_exception is True:
raise DockerException(
"Unable to pull image: see actual error above. "
"Please, run the task again or contact support team."
)
else:
logger.warn("Pulling step is skipped. Unable to pull image: {!r}.".format(str(e)))


def _docker_pull_progress(docker_api, docker_image_name, logger, raise_exception=True):
logger.info("Docker image will be pulled", extra={"image_name": docker_image_name})
from docker.errors import DockerException

try:
layers_total_load = {}
layers_current_load = {}
layers_total_extract = {}
layers_current_extract = {}
started = set()
loaded = set()
pulled = set()

progress_full = Progress("Preparing dockerimage", 1, ext_logger=logger)
progres_ext = Progress("Extracting layers", 1, is_size=True, ext_logger=logger)
progress_load = Progress("Downloading layers", 1, is_size=True, ext_logger=logger)

for line in docker_api.api.pull(docker_image_name, stream=True, decode=True):
status = PullStatus.from_str(line.get("status", None))
layer_id = line.get("id", None)
progress_details = line.get("progressDetail", {})
need_report = True

if status is PullStatus.START:
started.add(layer_id)
need_report = False
elif status is PullStatus.DOWNLOAD:
layers_current_load[layer_id] = progress_details.get("current", 0)
layers_total_load[layer_id] = progress_details.get(
"total", layers_current_load[layer_id]
)
total_load = sum(layers_total_load.values())
current_load = sum(layers_current_load.values())
if total_load > progress_load.total:
progress_load.set(current_load, total_load)
elif (current_load - progress_load.current) / total_load > 0.01:
progress_load.set(current_load, total_load)
else:
need_report = False
elif status is PullStatus.COMPLETE_LOAD:
loaded.add(layer_id)
elif status is PullStatus.EXTRACT:
layers_current_extract[layer_id] = progress_details.get("current", 0)
layers_total_extract[layer_id] = progress_details.get(
"total", layers_current_extract[layer_id]
)
total_ext = sum(layers_total_extract.values())
current_ext = sum(layers_current_extract.values())
if total_ext > progres_ext.total:
progres_ext.set(current_ext, total_ext)
elif (current_ext - progres_ext.current) / total_ext > 0.01:
progres_ext.set(current_ext, total_ext)
else:
need_report = False
elif status is PullStatus.COMPLETE_PULL:
pulled.add(layer_id)

if started != pulled:
if need_report:
if started == loaded:
progres_ext.report_progress()
else:
progress_load.report_progress()
elif len(pulled) > 0:
progress_full.report_progress()

progress_full.iter_done()
progress_full.report_progress()
logger.info("Docker image has been pulled", extra={"image_name": docker_image_name})
except DockerException as e:
if raise_exception is True:
raise e
# raise DockerException(
# "Unable to pull image: see actual error above. "
# "Please, run the task again or contact support team."
# )
else:
logger.warn("Pulling step is skipped. Unable to pull image: {!r}.".format(repr(e)))


def _docker_image_exists(docker_api, docker_image_name):
from docker.errors import ImageNotFound

try:
docker_img = docker_api.images.get(docker_image_name)
except ImageNotFound:
return False
return True
3 changes: 2 additions & 1 deletion agent/worker/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)

from worker import constants
from worker import docker_utils
from worker.agent_utils import (
filter_log_line,
pip_req_satisfied_filter,
Expand Down Expand Up @@ -520,7 +521,7 @@ def sync_pip_cache(self):
@handle_exceptions
def find_or_run_container(self):
add_labels = {"sly_app": "1", "app_session_id": str(self.info["task_id"])}
sly.docker_utils.docker_pull_if_needed(
docker_utils.docker_pull_if_needed(
self._docker_api,
self.docker_image_name,
constants.PULL_POLICY(),
Expand Down
33 changes: 20 additions & 13 deletions agent/worker/task_pull_docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
import supervisely_lib as sly

from worker import constants
from worker import docker_utils
from worker.task_sly import TaskSly


class TaskPullDockerImage(TaskSly):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.docker_runtime = 'runc' # or 'nvidia'
self.docker_runtime = "runc" # or 'nvidia'
self._docker_api = None # must be set by someone

self.docker_image_name = self.info.get('docker_image', None)
if self.docker_image_name is not None and ':' not in self.docker_image_name:
self.docker_image_name += ':latest'
self.docker_image_name = self.info.get("docker_image", None)
if self.docker_image_name is not None and ":" not in self.docker_image_name:
self.docker_image_name += ":latest"
self.docker_pulled = False # in task

@property
Expand All @@ -27,14 +28,18 @@ def docker_api(self, val):
self._docker_api = val

def task_main_func(self):
self.logger.info('TASK_START', extra={'event_type': sly.EventType.TASK_STARTED})
sly.docker_utils.docker_pull_if_needed(self._docker_api, self.docker_image_name, self.info['pull_policy'], self.logger)
self.logger.info("TASK_START", extra={"event_type": sly.EventType.TASK_STARTED})
docker_utils.docker_pull_if_needed(
self._docker_api, self.docker_image_name, self.info["pull_policy"], self.logger
)
docker_img = self._docker_api.images.get(self.docker_image_name)
if constants.CHECK_VERSION_COMPATIBILITY():
self._validate_version(self.info["agent_version"], docker_img.labels.get("VERSION", None))
self._validate_version(
self.info["agent_version"], docker_img.labels.get("VERSION", None)
)

def _validate_version(self, agent_image, plugin_image):
self.logger.info('Check if agent and plugin versions are compatible')
self.logger.info("Check if agent and plugin versions are compatible")

def get_version(docker_image):
if docker_image is None:
Expand All @@ -48,22 +53,24 @@ def get_version(docker_image):
plugin_version = get_version(plugin_image)

if agent_version is None:
self.logger.info('Unknown agent version')
self.logger.info("Unknown agent version")
return

if plugin_version is None:
self.logger.info('Unknown plugin version')
self.logger.info("Unknown plugin version")
return

av = version.parse(agent_version)
pv = version.parse(plugin_version)

if type(av) is version.LegacyVersion or type(pv) is version.LegacyVersion:
self.logger.info('Invalid semantic version, can not compare')
self.logger.info("Invalid semantic version, can not compare")
return

if av.release[0] < pv.release[0]:
self.logger.critical('Agent version is lower than plugin version. Please, upgrade agent.')
self.logger.critical(
"Agent version is lower than plugin version. Please, upgrade agent."
)

def end_log_stop(self):
return sly.EventType.TASK_STOPPED
Expand All @@ -75,4 +82,4 @@ def end_log_finish(self):
return sly.EventType.TASK_FINISHED

def report_start(self):
pass
pass
9 changes: 5 additions & 4 deletions agent/worker/task_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from docker.errors import DockerException, ImageNotFound
from worker import constants
from worker import agent_utils
from worker import docker_utils
from worker.system_info import get_container_info


Expand Down Expand Up @@ -57,15 +58,15 @@ def task_main_func(self):
image = envs[constants._DOCKER_IMAGE]

# Pull new image if needed
if envs.get(constants._PULL_POLICY) != str(sly.docker_utils.PullPolicy.NEVER):
sly.docker_utils._docker_pull_progress(self._docker_api, image, self.logger)
if envs.get(constants._PULL_POLICY) != str(docker_utils.PullPolicy.NEVER):
docker_utils._docker_pull_progress(self._docker_api, image, self.logger)

# Pull net-client if needed
net_container_name = constants.NET_CLIENT_CONTAINER_NAME()
try:
sly_net_container = self._docker_api.containers.get(net_container_name)

if envs.get(constants._PULL_POLICY) != str(sly.docker_utils.PullPolicy.NEVER):
if envs.get(constants._PULL_POLICY) != str(docker_utils.PullPolicy.NEVER):
sly_net_client_image_name = None

if use_options:
Expand Down Expand Up @@ -132,5 +133,5 @@ def check_and_pull_sly_net_if_needed(
return False
else:
logger.info("Found new version of sly-net-client. Pulling...")
sly.docker_utils._docker_pull_progress(dc, sly_net_client_image_name, logger)
docker_utils._docker_pull_progress(dc, sly_net_client_image_name, logger)
return True

0 comments on commit 8012249

Please sign in to comment.