From a7b8d666c601752d75164b6a2bb888a136707778 Mon Sep 17 00:00:00 2001 From: Mateusz Date: Mon, 29 Jan 2024 14:12:56 +0100 Subject: [PATCH] Update node_manage.html and gpu_monitor.py, and fix node_websocket_client.py --- .../templates/manager/node_manage.html | 4 +- backend/manager/views.py | 195 +++++++++++------- backend/node/functions/gpu_monitor.py | 5 +- backend/node/node_websocket_client.py | 12 +- 4 files changed, 129 insertions(+), 87 deletions(-) diff --git a/backend/manager/templates/manager/node_manage.html b/backend/manager/templates/manager/node_manage.html index 94270d4..cb7664d 100644 --- a/backend/manager/templates/manager/node_manage.html +++ b/backend/manager/templates/manager/node_manage.html @@ -1,7 +1,7 @@ {% extends "manager/base.html" %} -{% block title %}Manage node{% endblock %} -{% block name %}Manage node{% endblock %} +{% block title %}Manage node {{node_id}}{% endblock %} +{% block name %}Manage node {{node_id}}{% endblock %} {% block content %}
diff --git a/backend/manager/views.py b/backend/manager/views.py index 21842cb..ab41c03 100644 --- a/backend/manager/views.py +++ b/backend/manager/views.py @@ -25,6 +25,7 @@ from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from datetime import timedelta +import time @csrf_exempt def add_task(request): @@ -144,7 +145,7 @@ def manage_node(self, request, node_id): # Logika do wyświetlenia szczegółów i zarządzania konkretnym węzłem gpu_list = Gpus.objects.all() gpus = Gpus.objects.filter(node_id=node_id) - return render(request, "manager/node_manage.html", {"gpus": gpus,"node_id":1}) + return render(request, "manager/node_manage.html", {"gpus": gpus,"node_id":node_id}) # def refresh_gpus(self,request, node_id): @@ -159,7 +160,7 @@ def manage_node(self, request, node_id): # ) # gpus = Gpus.objects.filter(node_id=node_id) # # Przekierowanie z powrotem do strony zarządzania nodem - # return redirect(reverse('manage_node', kwargs={'node_id': node_id})) + # return redirect(reverse("manage_node", kwargs={"node_id": node_id})) def refresh_gpus_ajax(request, node_id): @@ -172,24 +173,32 @@ def refresh_gpus_ajax(request, node_id): "node_id":node_id, } ) - - gpus = Gpus.objects.filter(node_id=node_id) + time.sleep(3) + node = get_object_or_404(Nodes, id=node_id) + if not node: + return Response({"error": "Node not found."}, status=404) + + gpus = Gpus.objects.filter(node_id=node) + if not gpus: + return Response({"error": "No GPUs found for the given node."}, status=404) + + gpus_data = [ - { - 'id': gpu.id, - 'brand_name': gpu.brand_name, - 'gpu_speed': gpu.gpu_speed, - 'gpu_util': gpu.gpu_util, - 'is_running_amumax': gpu.is_running_amumax, - 'gpu_info': gpu.gpu_info, - 'status': gpu.status, - 'node_id':node_id, - 'last_update': gpu.last_update.strftime('%Y-%m-%d %H:%M:%S') if gpu.last_update else None - } - for gpu in gpus - ] - - return JsonResponse({'gpus': gpus_data}) + { + "id": gpu.id, + "brand_name": gpu.brand_name, + "gpu_speed": gpu.gpu_speed, + "gpu_util": gpu.gpu_util, + "is_running_amumax": gpu.is_running_amumax, + "gpu_info": gpu.gpu_info, + "status": gpu.status, + "node_id": node_id, + "last_update": gpu.last_update.strftime("%Y-%m-%d %H:%M:%S") if gpu.last_update else None + } + for gpu in gpus + ] + + return JsonResponse({"gpus": gpus_data}) class NodeManagementView(APIView): @csrf_exempt @@ -358,7 +367,7 @@ def get(self, request, action, *args, **kwargs): return methods[action](request, task_id=task_id) else: # Redirect or return JSON response for invalid action - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": return Response({"error": "Invalid action"}, status=400) else: return redirect("task_list") @@ -371,7 +380,7 @@ def post(self, request, *args, **kwargs): elif action == "edit_task": return self.edit_task(request, task_id=kwargs.get("task_id")) else: - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": return Response({"error": "Invalid action"}, status=400) else: return redirect("task_list") @@ -383,17 +392,17 @@ def edit_task(self, request, task_id=None): form = EditTaskForm(request.POST, instance=task) if form.is_valid(): form.save() - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": return Response({"message": "Task updated successfully!"}) - elif request.accepted_renderer.format == 'html': - messages.success(request, "Task updated successfully!",extra_tags='primary') + elif request.accepted_renderer.format == "html": + messages.success(request, "Task updated successfully!",extra_tags="primary") return redirect("task_list") else: return redirect("task_list") else: form = EditTaskForm(instance=task) - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": return Response(form.errors, status=400) else: return render(request, "manager/edit_task.html", {"form": form, "task": task}) @@ -402,89 +411,119 @@ def edit_task(self, request, task_id=None): def delete_task(self, request, task_id=None): task = get_object_or_404(Task, pk=task_id) task.delete() - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": return Response({"message": "Task deleted successfully"}) - elif request.accepted_renderer.format == 'html': - messages.success(request, "Task canceled successfully!",extra_tags='primary') + elif request.accepted_renderer.format == "html": + messages.success(request, "Task canceled successfully!",extra_tags="primary") return redirect("task_list") else: return redirect("task_list") def select_gpu_for_task(self): - return Gpus.objects.filter(status=0).first() + return Gpus.objects.filter(status="Waiting").first() def get_priority_task(self): - return Task.objects.filter(status='waiting').order_by('-priority').first() + return Task.objects.filter(status="Waiting").order_by("-priority").first() def run_task(self, request, task_id=None): - task = self.get_priority_task() if task_id is None else get_object_or_404(Task, id=task_id) - gpu = self.select_gpu_for_task() - if not gpu: - message = "No available GPUs." - if request.accepted_renderer.format == 'json': - return Response({"error": message}, status=400) - elif request.accepted_renderer.format == 'html': - messages.success(request, message ,extra_tags='danger') - return redirect("task_list") - else: - return redirect("task_list") ############ PROPABLY IT's the same as above + try: + task = self.get_priority_task() if task_id is None else get_object_or_404(Task, id=task_id) + gpu = self.select_gpu_for_task() + if not gpu: + message = "No available GPUs." + if request.accepted_renderer.format == "json": + return Response({"error": message}, status=400) + elif request.accepted_renderer.format == "html": + messages.success(request, message ,extra_tags="danger") + return redirect("task_list") + else: + return redirect("task_list") ############ PROPABLY IT"s the same as above - task.assigned_gpu_id = f"N{gpu.node_id.id}/G{gpu.id}" - task.assigned_node_id = f"{gpu.node_id.ip}" - task.status = 'Pending' - task.save() + task.assigned_gpu_id = f"N{gpu.node_id.id}/G{gpu.no}" + task.assigned_node_id = f"{gpu.node_id.ip}" + task.status = "Pending" + task.save() - gpu.status = 'Running' - gpu.task_id = task - gpu.save() + gpu.status = "Running" + gpu.task_id = task + gpu.last_update = timezone.now() + gpu.save() - if request.accepted_renderer.format == 'json': - return Response({"message": "Task is running"}) - else: - return redirect("task_list") + message = f"Task {task_id} is pending on {gpu.node_id.id}/{gpu.no}." + if request.accepted_renderer.format == "json": + return Response({"message": message}) + elif request.accepted_renderer.format == "html": + messages.success(request, message ,extra_tags="primary") + return redirect("task_list") + else: + return redirect("task_list") + except Exception as e: + message = f"The task {task_id} has not been started. Error: {e}" + if request.accepted_renderer.format == "json": + return Response({"error": message}, status=400) + elif request.accepted_renderer.format == "html": + messages.success(request, message ,extra_tags="danger") + return redirect("task_list") + else: + return redirect("task_list") def cancel_task(self, request, task_id=None): - task = get_object_or_404(Task, id=task_id) - task.assigned_gpu_id = None - task.assigned_node_id = None - task.status = "Waiting" - task.save() - - gpu = Gpus.objects.get(id=task_id) - gpu.status = "Waiting" - gpu.task_id = None - gpu.last_update = timezone.now() - gpu.save() - - if request.accepted_renderer.format == 'json': - return Response({"message": "Task cancelled"}) - else: - return redirect("task_list") + try: + task = get_object_or_404(Task, id=task_id) + task.assigned_gpu_id = None + task.assigned_node_id = None + task.status = "Waiting" + task.save() + + gpu = get_object_or_404(Gpus, task_id=task) + gpu.status = "Waiting" + gpu.task_id = None + gpu.last_update = timezone.now() + gpu.save() + + message = "Task cancelled." + if request.accepted_renderer.format == "json": + return Response({"message": message}) + elif request.accepted_renderer.format == "html": + messages.success(request, message ,extra_tags="primary") + return redirect("task_list") + else: + return redirect("task_list") + except Exception as e: + message = f"Task has not been canceled. Error: {e}" + if request.accepted_renderer.format == "json": + return Response({"error": message}, status=400) + elif request.accepted_renderer.format == "html": + messages.success(request, message ,extra_tags="danger") + return redirect("task_list") + else: + return redirect("task_list") + def redo_task(self, request, task_id=None): # Logic to redo the task # ... - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": return Response({"message": f"Task {task_id} redo initiated"}) else: return HttpResponse(f"Task {task_id} redo initiated") def add_task_form(self, request,task_id=None): - if request.method == 'GET': + if request.method == "GET": form = AddTaskForm() - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": # W przypadku odpowiedzi JSON, zwracamy pustą strukturę formularza serializer = TaskSerializer(data=request.data) return Response(serializer.data) else: # W przypadku odpowiedzi HTML, renderujemy formularz HTML - return render(request, 'manager/task_form.html', {'form': form}) + return render(request, "manager/task_form.html", {"form": form}) - elif request.method == 'POST': + elif request.method == "POST": # Obsługa żądania POST dla tworzenia nowego zadania - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": data = JSONParser().parse(request) serializer = TaskSerializer(data=request.data) if serializer.is_valid(): @@ -497,16 +536,16 @@ def add_task_form(self, request,task_id=None): form.save() return redirect("task_list") else: - return render(request, 'manager/task_form.html', {'form': form}) + return render(request, "manager/task_form.html", {"form": form}) class TaskListView(APIView): renderer_classes = [TemplateHTMLRenderer, JSONRenderer] def get(self, request): waiting_tasks = Task.objects.filter( - Q(status='Waiting') | Q(status='Waiting') | Q(status='Interrupted') | Q(status=None) + Q(status="Waiting") | Q(status="Waiting") | Q(status="Interrupted") | Q(status=None) ) pending_tasks = Task.objects.filter( - Q(status='Pending') | Q(status='Running') | Q(status=None) + Q(status="Pending") | Q(status="Running") | Q(status=None) ) tasks = Task.objects.all() @@ -517,7 +556,7 @@ def get(self, request): } # Return a JSON response if the request is for API - if request.accepted_renderer.format == 'json': + if request.accepted_renderer.format == "json": return Response(data) # Otherwise, render the HTML template diff --git a/backend/node/functions/gpu_monitor.py b/backend/node/functions/gpu_monitor.py index ae2a8a7..10856e3 100644 --- a/backend/node/functions/gpu_monitor.py +++ b/backend/node/functions/gpu_monitor.py @@ -47,7 +47,7 @@ def check_gpu_load(self, gpu_index=0, check_duration=2, threshold=20): # Return 'free' if the GPU is less loaded than the threshold if gpu_util < threshold and mem_util < threshold: - return 0, gpu_util + return "Waiting", gpu_util except subprocess.CalledProcessError as e: print(f"Error running nvidia-smi: {e}") return "error" @@ -178,7 +178,6 @@ def submit_update_gpu_status(self,node_id): for gpu_key, gpu in self.gpus_status.items(): data = { "action": "update_node_gpu_status", - "no": gpu_key, "brand_name": gpu["name"], "gpu_util": gpu["gpu_util"], "status": gpu["status"], @@ -188,6 +187,8 @@ def submit_update_gpu_status(self,node_id): } try: + # import json + # print(json.dumps(data)) response = requests.post(self.ls.managerNmUrl, data=data) if response.status_code == 200: self.stdout.write( diff --git a/backend/node/node_websocket_client.py b/backend/node/node_websocket_client.py index 95c8df0..514e0ee 100644 --- a/backend/node/node_websocket_client.py +++ b/backend/node/node_websocket_client.py @@ -1,17 +1,18 @@ import asyncio import websockets import json -from asgiref.sync import sync_to_async import logging +from django.shortcuts import get_object_or_404 +from asgiref.sync import sync_to_async async def get_node_id(): from node.models import Local - from asgiref.sync import sync_to_async - node_id = await sync_to_async(Local.objects.get, thread_sensitive=True)(id=1) - return node_id.id + local = await sync_to_async(get_object_or_404)(Local, pk=1) + return int(local.node_id) async def connect_to_manager(): uri = "ws://localhost:8000/ws/node/" + node_id = await get_node_id() while True: try: async with websockets.connect(uri) as websocket: @@ -23,9 +24,10 @@ async def connect_to_manager(): try: message = await websocket.recv() data = json.loads(message) + print(message) command = data.get("command") r_node_id = data.get("node_id") - node_id = await get_node_id() + if command == "update_gpus" and r_node_id == node_id: await sync_to_async(execute_update_gpus)(node_id)