Skip to content

Commit

Permalink
Update node_manage.html and gpu_monitor.py, and fix node_websocket_cl…
Browse files Browse the repository at this point in the history
…ient.py
  • Loading branch information
kkingstoun committed Jan 29, 2024
1 parent 88194f2 commit a7b8d66
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 87 deletions.
4 changes: 2 additions & 2 deletions backend/manager/templates/manager/node_manage.html
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% extends "manager/base.html" %}

{% block title %}Manage node{% endblock %}
{% block name %}Manage node{% endblock %}
{% block title %}Manage node {{node_id}}{% endblock %}
{% block name %}Manage node {{node_id}}{% endblock %}
{% block content %}
<div class="container">
<div class="mb-3">
Expand Down
195 changes: 117 additions & 78 deletions backend/manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from datetime import timedelta
import time

@csrf_exempt
def add_task(request):
Expand Down Expand Up @@ -144,7 +145,7 @@ def manage_node(self, request, node_id):
# Logika do wyświetlenia szczegółów i zarządzania konkretnym węzłem
gpu_list = Gpus.objects.all()
gpus = Gpus.objects.filter(node_id=node_id)
return render(request, "manager/node_manage.html", {"gpus": gpus,"node_id":1})
return render(request, "manager/node_manage.html", {"gpus": gpus,"node_id":node_id})


# def refresh_gpus(self,request, node_id):
Expand All @@ -159,7 +160,7 @@ def manage_node(self, request, node_id):
# )
# gpus = Gpus.objects.filter(node_id=node_id)
# # Przekierowanie z powrotem do strony zarządzania nodem
# return redirect(reverse('manage_node', kwargs={'node_id': node_id}))
# return redirect(reverse("manage_node", kwargs={"node_id": node_id}))

def refresh_gpus_ajax(request, node_id):

Expand All @@ -172,24 +173,32 @@ def refresh_gpus_ajax(request, node_id):
"node_id":node_id,
}
)

gpus = Gpus.objects.filter(node_id=node_id)
time.sleep(3)
node = get_object_or_404(Nodes, id=node_id)
if not node:
return Response({"error": "Node not found."}, status=404)

gpus = Gpus.objects.filter(node_id=node)
if not gpus:
return Response({"error": "No GPUs found for the given node."}, status=404)


gpus_data = [
{
'id': gpu.id,
'brand_name': gpu.brand_name,
'gpu_speed': gpu.gpu_speed,
'gpu_util': gpu.gpu_util,
'is_running_amumax': gpu.is_running_amumax,
'gpu_info': gpu.gpu_info,
'status': gpu.status,
'node_id':node_id,
'last_update': gpu.last_update.strftime('%Y-%m-%d %H:%M:%S') if gpu.last_update else None
}
for gpu in gpus
]

return JsonResponse({'gpus': gpus_data})
{
"id": gpu.id,
"brand_name": gpu.brand_name,
"gpu_speed": gpu.gpu_speed,
"gpu_util": gpu.gpu_util,
"is_running_amumax": gpu.is_running_amumax,
"gpu_info": gpu.gpu_info,
"status": gpu.status,
"node_id": node_id,
"last_update": gpu.last_update.strftime("%Y-%m-%d %H:%M:%S") if gpu.last_update else None
}
for gpu in gpus
]
return JsonResponse({"gpus": gpus_data})

class NodeManagementView(APIView):
@csrf_exempt
Expand Down Expand Up @@ -358,7 +367,7 @@ def get(self, request, action, *args, **kwargs):
return methods[action](request, task_id=task_id)
else:
# Redirect or return JSON response for invalid action
if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
return Response({"error": "Invalid action"}, status=400)
else:
return redirect("task_list")
Expand All @@ -371,7 +380,7 @@ def post(self, request, *args, **kwargs):
elif action == "edit_task":
return self.edit_task(request, task_id=kwargs.get("task_id"))
else:
if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
return Response({"error": "Invalid action"}, status=400)
else:
return redirect("task_list")
Expand All @@ -383,17 +392,17 @@ def edit_task(self, request, task_id=None):
form = EditTaskForm(request.POST, instance=task)
if form.is_valid():
form.save()
if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
return Response({"message": "Task updated successfully!"})
elif request.accepted_renderer.format == 'html':
messages.success(request, "Task updated successfully!",extra_tags='primary')
elif request.accepted_renderer.format == "html":
messages.success(request, "Task updated successfully!",extra_tags="primary")
return redirect("task_list")
else:
return redirect("task_list")
else:
form = EditTaskForm(instance=task)

if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
return Response(form.errors, status=400)
else:
return render(request, "manager/edit_task.html", {"form": form, "task": task})
Expand All @@ -402,89 +411,119 @@ def edit_task(self, request, task_id=None):
def delete_task(self, request, task_id=None):
task = get_object_or_404(Task, pk=task_id)
task.delete()
if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
return Response({"message": "Task deleted successfully"})
elif request.accepted_renderer.format == 'html':
messages.success(request, "Task canceled successfully!",extra_tags='primary')
elif request.accepted_renderer.format == "html":
messages.success(request, "Task canceled successfully!",extra_tags="primary")
return redirect("task_list")
else:
return redirect("task_list")

def select_gpu_for_task(self):
return Gpus.objects.filter(status=0).first()
return Gpus.objects.filter(status="Waiting").first()

def get_priority_task(self):
return Task.objects.filter(status='waiting').order_by('-priority').first()
return Task.objects.filter(status="Waiting").order_by("-priority").first()

def run_task(self, request, task_id=None):
task = self.get_priority_task() if task_id is None else get_object_or_404(Task, id=task_id)
gpu = self.select_gpu_for_task()
if not gpu:
message = "No available GPUs."
if request.accepted_renderer.format == 'json':
return Response({"error": message}, status=400)
elif request.accepted_renderer.format == 'html':
messages.success(request, message ,extra_tags='danger')
return redirect("task_list")
else:
return redirect("task_list") ############ PROPABLY IT's the same as above
try:
task = self.get_priority_task() if task_id is None else get_object_or_404(Task, id=task_id)
gpu = self.select_gpu_for_task()
if not gpu:
message = "No available GPUs."
if request.accepted_renderer.format == "json":
return Response({"error": message}, status=400)
elif request.accepted_renderer.format == "html":
messages.success(request, message ,extra_tags="danger")
return redirect("task_list")
else:
return redirect("task_list") ############ PROPABLY IT"s the same as above

task.assigned_gpu_id = f"N{gpu.node_id.id}/G{gpu.id}"
task.assigned_node_id = f"{gpu.node_id.ip}"
task.status = 'Pending'
task.save()
task.assigned_gpu_id = f"N{gpu.node_id.id}/G{gpu.no}"
task.assigned_node_id = f"{gpu.node_id.ip}"
task.status = "Pending"
task.save()

gpu.status = 'Running'
gpu.task_id = task
gpu.save()
gpu.status = "Running"
gpu.task_id = task
gpu.last_update = timezone.now()
gpu.save()


if request.accepted_renderer.format == 'json':
return Response({"message": "Task is running"})
else:
return redirect("task_list")
message = f"Task {task_id} is pending on {gpu.node_id.id}/{gpu.no}."
if request.accepted_renderer.format == "json":
return Response({"message": message})
elif request.accepted_renderer.format == "html":
messages.success(request, message ,extra_tags="primary")
return redirect("task_list")
else:
return redirect("task_list")
except Exception as e:
message = f"The task {task_id} has not been started. Error: {e}"
if request.accepted_renderer.format == "json":
return Response({"error": message}, status=400)
elif request.accepted_renderer.format == "html":
messages.success(request, message ,extra_tags="danger")
return redirect("task_list")
else:
return redirect("task_list")

def cancel_task(self, request, task_id=None):
task = get_object_or_404(Task, id=task_id)
task.assigned_gpu_id = None
task.assigned_node_id = None
task.status = "Waiting"
task.save()

gpu = Gpus.objects.get(id=task_id)
gpu.status = "Waiting"
gpu.task_id = None
gpu.last_update = timezone.now()
gpu.save()

if request.accepted_renderer.format == 'json':
return Response({"message": "Task cancelled"})
else:
return redirect("task_list")
try:
task = get_object_or_404(Task, id=task_id)
task.assigned_gpu_id = None
task.assigned_node_id = None
task.status = "Waiting"
task.save()

gpu = get_object_or_404(Gpus, task_id=task)
gpu.status = "Waiting"
gpu.task_id = None
gpu.last_update = timezone.now()
gpu.save()

message = "Task cancelled."
if request.accepted_renderer.format == "json":
return Response({"message": message})
elif request.accepted_renderer.format == "html":
messages.success(request, message ,extra_tags="primary")
return redirect("task_list")
else:
return redirect("task_list")
except Exception as e:
message = f"Task has not been canceled. Error: {e}"
if request.accepted_renderer.format == "json":
return Response({"error": message}, status=400)
elif request.accepted_renderer.format == "html":
messages.success(request, message ,extra_tags="danger")
return redirect("task_list")
else:
return redirect("task_list")


def redo_task(self, request, task_id=None):
# Logic to redo the task
# ...

if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
return Response({"message": f"Task {task_id} redo initiated"})
else:
return HttpResponse(f"Task {task_id} redo initiated")

def add_task_form(self, request,task_id=None):
if request.method == 'GET':
if request.method == "GET":
form = AddTaskForm()
if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
# W przypadku odpowiedzi JSON, zwracamy pustą strukturę formularza
serializer = TaskSerializer(data=request.data)
return Response(serializer.data)
else:
# W przypadku odpowiedzi HTML, renderujemy formularz HTML
return render(request, 'manager/task_form.html', {'form': form})
return render(request, "manager/task_form.html", {"form": form})

elif request.method == 'POST':
elif request.method == "POST":
# Obsługa żądania POST dla tworzenia nowego zadania
if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
data = JSONParser().parse(request)
serializer = TaskSerializer(data=request.data)
if serializer.is_valid():
Expand All @@ -497,16 +536,16 @@ def add_task_form(self, request,task_id=None):
form.save()
return redirect("task_list")
else:
return render(request, 'manager/task_form.html', {'form': form})
return render(request, "manager/task_form.html", {"form": form})

class TaskListView(APIView):
renderer_classes = [TemplateHTMLRenderer, JSONRenderer]
def get(self, request):
waiting_tasks = Task.objects.filter(
Q(status='Waiting') | Q(status='Waiting') | Q(status='Interrupted') | Q(status=None)
Q(status="Waiting") | Q(status="Waiting") | Q(status="Interrupted") | Q(status=None)
)
pending_tasks = Task.objects.filter(
Q(status='Pending') | Q(status='Running') | Q(status=None)
Q(status="Pending") | Q(status="Running") | Q(status=None)
)

tasks = Task.objects.all()
Expand All @@ -517,7 +556,7 @@ def get(self, request):
}

# Return a JSON response if the request is for API
if request.accepted_renderer.format == 'json':
if request.accepted_renderer.format == "json":
return Response(data)

# Otherwise, render the HTML template
Expand Down
5 changes: 3 additions & 2 deletions backend/node/functions/gpu_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def check_gpu_load(self, gpu_index=0, check_duration=2, threshold=20):

# Return 'free' if the GPU is less loaded than the threshold
if gpu_util < threshold and mem_util < threshold:
return 0, gpu_util
return "Waiting", gpu_util
except subprocess.CalledProcessError as e:
print(f"Error running nvidia-smi: {e}")
return "error"
Expand Down Expand Up @@ -178,7 +178,6 @@ def submit_update_gpu_status(self,node_id):
for gpu_key, gpu in self.gpus_status.items():
data = {
"action": "update_node_gpu_status",
"no": gpu_key,
"brand_name": gpu["name"],
"gpu_util": gpu["gpu_util"],
"status": gpu["status"],
Expand All @@ -188,6 +187,8 @@ def submit_update_gpu_status(self,node_id):
}

try:
# import json
# print(json.dumps(data))
response = requests.post(self.ls.managerNmUrl, data=data)
if response.status_code == 200:
self.stdout.write(
Expand Down
12 changes: 7 additions & 5 deletions backend/node/node_websocket_client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import asyncio
import websockets
import json
from asgiref.sync import sync_to_async
import logging
from django.shortcuts import get_object_or_404
from asgiref.sync import sync_to_async

async def get_node_id():
from node.models import Local
from asgiref.sync import sync_to_async
node_id = await sync_to_async(Local.objects.get, thread_sensitive=True)(id=1)
return node_id.id
local = await sync_to_async(get_object_or_404)(Local, pk=1)
return int(local.node_id)

async def connect_to_manager():
uri = "ws://localhost:8000/ws/node/"
node_id = await get_node_id()
while True:
try:
async with websockets.connect(uri) as websocket:
Expand All @@ -23,9 +24,10 @@ async def connect_to_manager():
try:
message = await websocket.recv()
data = json.loads(message)
print(message)
command = data.get("command")
r_node_id = data.get("node_id")
node_id = await get_node_id()


if command == "update_gpus" and r_node_id == node_id:
await sync_to_async(execute_update_gpus)(node_id)
Expand Down

0 comments on commit a7b8d66

Please sign in to comment.