Skip to content

Commit

Permalink
Cuda memory auto updater (#31)
Browse files Browse the repository at this point in the history
* disable warning + sed cuda mem every 60 sec

* SDK 6.72.71

* fix missing gpu_info

* send gpu mem in agent's process

* torch.cuda.init()

* added nvidia nvidia-ml-py to reqs

* added get_gpu_info using pynvml

* upd fields names in get_gpu_info

* add debug log msg

---------

Co-authored-by: Maxim Kolomeychenko <[email protected]>
Co-authored-by: TheoLisin <[email protected]>
  • Loading branch information
3 people authored Jul 21, 2023
1 parent bbf60c8 commit a63a4db
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 46 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ RUN apt-get update \
RUN pip install docker==6.0.1
RUN pip install version-parser==1.0.1
RUN pip install python-slugify==6.1.2
RUN pip install nvidia-ml-py==12.535.77

############### copy code ###############
#COPY supervisely_lib /workdir/supervisely_lib
RUN pip install httpx
RUN pip install supervisely==6.72.59
RUN pip install requests-toolbelt>=1.0.0

RUN pip install torch==1.7.1+cu110 torchvision==0.8.2+cu110 -f https://download.pytorch.org/whl/torch_stable.html

RUN pip install supervisely==6.72.71

COPY . /workdir

#ENV PYTHONPATH /workdir:/workdir/src:/workdir/supervisely_lib/worker_proto:$PYTHONPATH
Expand Down
34 changes: 21 additions & 13 deletions agent/worker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
import os
import supervisely_lib as sly
import uuid
import warnings

warnings.filterwarnings(action="ignore", category=UserWarning)

import torch

from worker import constants
from worker.task_factory import create_task, is_task_type
from worker.logs_to_rpc import add_task_handler
from worker.agent_utils import LogQueue
from worker.system_info import get_hw_info, get_self_docker_image_digest
from worker.system_info import get_hw_info, get_self_docker_image_digest, get_gpu_info
from worker.app_file_streamer import AppFileStreamer
from worker.telemetry_reporter import TelemetryReporter
from supervisely_lib._utils import _remove_sensitive_information
Expand Down Expand Up @@ -116,18 +120,7 @@ def agent_connect_initially(self):
config = {}
self.logger.warn("Docker container info unavailable, agent is running in debug VENV")

gpu_info = None
try:
gpu_info = {}
gpu_info["is_available"] = torch.cuda.is_available()
if gpu_info["is_available"]:
gpu_info["device_count"] = torch.cuda.device_count()
gpu_info["device_names"] = []
for idx in range(gpu_info["device_count"]):
gpu_info["device_names"].append(torch.cuda.get_device_name(idx))
except Exception as e:
self.logger.warning(repr(e))

gpu_info = get_gpu_info(self.logger)
hw_info["gpuinfo"] = gpu_info

self.agent_info = {
Expand Down Expand Up @@ -342,6 +335,8 @@ def submit_log(self):
def follow_daemon(self, process_cls, name, sleep_sec=5):
proc = process_cls()
self.daemons_list.append(proc)
GPU_FREQ = 60
last_gpu_message = 0
try:
proc.start()
while True:
Expand All @@ -351,6 +346,19 @@ def follow_daemon(self, process_cls, name, sleep_sec=5):
time.sleep(1) # an opportunity to send log
raise RuntimeError(err_msg)
time.sleep(sleep_sec)
last_gpu_message -= sleep_sec
if last_gpu_message <= 0:
gpu_info = get_gpu_info(self.logger)
self.logger.debug(f"GPU state: {gpu_info}")
self.api.simple_request(
"UpdateTelemetry",
sly.api_proto.Empty,
sly.api_proto.AgentInfo(
info=json.dumps({"gpu_info": gpu_info})
),
)
last_gpu_message = GPU_FREQ

except Exception as e:
proc.terminate()
proc.join(timeout=2)
Expand Down
72 changes: 72 additions & 0 deletions agent/worker/system_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
import subprocess

import psutil
import pynvml as smi
import docker
import socket
import warnings

warnings.filterwarnings(action="ignore", category=UserWarning)

import torch

import supervisely_lib as sly

Expand Down Expand Up @@ -180,3 +186,69 @@ def _get_self_docker_image_digest():

def get_self_docker_image_digest():
return sly.catch_silently(_get_self_docker_image_digest)


def get_gpu_info_with_torch(logger):
torch.cuda.init()
gpu_info = None
try:
gpu_info = {}
gpu_info["is_available"] = torch.cuda.is_available()
if gpu_info["is_available"]:
gpu_info["device_count"] = torch.cuda.device_count()
gpu_info["device_names"] = []
gpu_info["device_memory"] = []
for idx in range(gpu_info["device_count"]):
gpu_info["device_names"].append(torch.cuda.get_device_name(idx))
try:
device_props = torch.cuda.get_device_properties(idx)
t = device_props.total_memory
r = torch.cuda.memory_reserved(idx)
a = torch.cuda.memory_allocated(idx)
mem = {
"total": t,
"reserved": r,
"allocated": a,
"free": t - r,
}
except Exception as e:
logger.debug(repr(e))
mem = {}
finally:
gpu_info["device_memory"].append(mem)

except Exception as e:
logger.warning(repr(e))
return gpu_info


def get_gpu_info(logger):
gpu_info = None
try:
smi.nvmlInit()
gpu_info = {}
gpu_info["is_available"] = torch.cuda.is_available()
if gpu_info["is_available"]:
gpu_info["device_count"] = smi.nvmlDeviceGetCount()
gpu_info["device_names"] = []
gpu_info["device_memory"] = []
for idx in range(gpu_info["device_count"]):
handle = smi.nvmlDeviceGetHandleByIndex(idx)
gpu_info["device_names"].append(smi.nvmlDeviceGetName(handle))
try:
device_props = smi.nvmlDeviceGetMemoryInfo(handle)
mem = {
"total": device_props.total,
"reserved": device_props.used,
"available": device_props.free,
}
except Exception as e:
logger.debug(repr(e))
mem = {}
finally:
gpu_info["device_memory"].append(mem)
smi.nvmlShutdown()

except Exception as e:
logger.warning(repr(e))
return gpu_info
102 changes: 71 additions & 31 deletions agent/worker/telemetry_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import subprocess
from hurry.filesize import size as bytes_to_human
import json
import time
import supervisely_lib as sly

from worker.task_logged import TaskLogged
from worker import constants
from worker.system_info import get_directory_size_bytes
from worker.system_info import get_directory_size_bytes, get_gpu_info


class TelemetryReporter(TaskLogged):
NO_OUTPUT = b''
NO_OUTPUT = b""

@staticmethod
def _get_subprocess_out_if_possible_no_shell(args, timeout=None):
Expand All @@ -23,13 +24,13 @@ def _get_subprocess_out_if_possible_no_shell(args, timeout=None):

@staticmethod
def _get_subprocess_out_with_shell(args):
res = subprocess.Popen(args,
shell=True, executable="/bin/bash",
stdout=subprocess.PIPE).communicate()[0]
res = subprocess.Popen(
args, shell=True, executable="/bin/bash", stdout=subprocess.PIPE
).communicate()[0]
return res

def _get_subprocess_out_if_possible(self, proc_id, subprocess_args):
no_output = b''
no_output = b""
if proc_id in self.skip_subproc:
return no_output
res = TelemetryReporter._get_subprocess_out_with_shell(subprocess_args)
Expand All @@ -39,60 +40,99 @@ def _get_subprocess_out_if_possible(self, proc_id, subprocess_args):
return res

def __init__(self):
super().__init__({'task_id': 'telemetry'})
super().__init__({"task_id": "telemetry"})
self.skip_subproc = set()

def init_logger(self):
super().init_logger()
sly.change_formatters_default_values(self.logger, 'worker', 'telemetry')
sly.change_formatters_default_values(self.logger, "worker", "telemetry")

def get_telemetry_str(self):
htop_str = 'echo q | htop -C | ' \
'aha --line-fix | html2text -width 999 | grep -v "F1Help" | grep -v "xml version="'
htop_output = self._get_subprocess_out_if_possible('htop', [htop_str])
htop_str = (
"echo q | htop -C | "
'aha --line-fix | html2text -width 999 | grep -v "F1Help" | grep -v "xml version="'
)
htop_output = self._get_subprocess_out_if_possible("htop", [htop_str])

nvsmi_output = self.NO_OUTPUT
try:
nvsmi_output = self._get_subprocess_out_if_possible_no_shell(['nvidia-smi'], timeout=5)
nvsmi_output = self._get_subprocess_out_if_possible_no_shell(["nvidia-smi"], timeout=5)
except OSError:
pass

docker_inspect_cmd = "curl -s --unix-socket /var/run/docker.sock http://localhost/containers/$(hostname)/json"
docker_inspect_out = subprocess.Popen([docker_inspect_cmd],
shell=True, executable="/bin/bash",
stdout=subprocess.PIPE).communicate()[0]
docker_inspect_out = subprocess.Popen(
[docker_inspect_cmd], shell=True, executable="/bin/bash", stdout=subprocess.PIPE
).communicate()[0]

docker_image = json.loads(docker_inspect_out).get("Config", {}).get("Image", "Unavailable, may be in debug mode")
docker_image = (
json.loads(docker_inspect_out)
.get("Config", {})
.get("Image", "Unavailable, may be in debug mode")
)

img_sizeb = get_directory_size_bytes(self.data_mgr.storage.images.storage_root_path)
nn_sizeb = get_directory_size_bytes(self.data_mgr.storage.nns.storage_root_path)
tasks_sizeb = get_directory_size_bytes(constants.AGENT_TASKS_DIR())
node_storage = [
{'Images': bytes_to_human(img_sizeb)},
{'NN weights': bytes_to_human(nn_sizeb)},
{'Tasks': bytes_to_human(tasks_sizeb)},
{'Total': bytes_to_human(img_sizeb + nn_sizeb + tasks_sizeb)},
{"Images": bytes_to_human(img_sizeb)},
{"NN weights": bytes_to_human(nn_sizeb)},
{"Tasks": bytes_to_human(tasks_sizeb)},
{"Total": bytes_to_human(img_sizeb + nn_sizeb + tasks_sizeb)},
]

server_info = {
'htop': htop_output.decode("utf-8"),
'nvsmi': nvsmi_output.decode("utf-8"),
'node_storage': node_storage,
'docker_image': docker_image
"htop": htop_output.decode("utf-8"),
"nvsmi": nvsmi_output.decode("utf-8"),
"node_storage": node_storage,
"docker_image": docker_image,
"gpu_info": get_gpu_info(self.logger),
}

info_str = json.dumps(server_info)
return info_str

def task_main_func(self):
try:
self.logger.info('TELEMETRY_REPORTER_INITIALIZED')
self.logger.info("TELEMETRY_REPORTER_INITIALIZED")

for _ in self.api.get_endless_stream('GetTelemetryTask', sly.api_proto.Task, sly.api_proto.Empty()):
self.api.simple_request('UpdateTelemetry', sly.api_proto.Empty, sly.api_proto.AgentInfo(info=self.get_telemetry_str()))
for _ in self.api.get_endless_stream(
"GetTelemetryTask", sly.api_proto.Task, sly.api_proto.Empty()
):
self.api.simple_request(
"UpdateTelemetry",
sly.api_proto.Empty,
sly.api_proto.AgentInfo(info=self.get_telemetry_str()),
)

except Exception as e:
self.logger.critical('TELEMETRY_REPORTER_CRASHED', exc_info=True, extra={
'event_type': sly.EventType.TASK_CRASHED,
'exc_str': str(e),
})
self.logger.critical(
"TELEMETRY_REPORTER_CRASHED",
exc_info=True,
extra={
"event_type": sly.EventType.TASK_CRASHED,
"exc_str": str(e),
},
)


# class TelemetryAutoUpdater(TelemetryReporter):
# def task_main_func(self):
# try:
# self.logger.info("TELEMETRY_AUTO_UPDATER_60SEC_INITIALIZED")
# while True:
# self.api.simple_request(
# "UpdateTelemetry",
# sly.api_proto.Empty,
# sly.api_proto.AgentInfo(info=self.get_telemetry_str()),
# )
# time.sleep(60)
# except Exception as e:
# self.logger.warning(
# "TELEMETRY_AUTO_UPDATER_60SEC_CRASHED",
# exc_info=True,
# extra={
# "event_type": sly.EventType.TASK_CRASHED,
# "exc_str": str(e),
# },
# )
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ black
python-slugify==6.1.2
requests==2.28.1
urllib3==1.26.11
torch==1.7.1
torch==1.7.1
nvidia-ml-py==12.535.77

0 comments on commit a63a4db

Please sign in to comment.