Skip to content

Commit

Permalink
v0.0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
MathieuMoalic committed Mar 28, 2024
1 parent 9384b23 commit 70a07d5
Show file tree
Hide file tree
Showing 17 changed files with 318 additions and 187 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,4 @@ venv.bak/
**/dev-dist
manager/db.sqlite3-journal
manager/schema.yml
staging
6 changes: 5 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ manager:
--name amuman-manager-staging \
--network amuman-staging \
-v ./staging:/manager \
-v ./mock_nas:/mnt/smb \
-e SECRET_KEY=$SECRET_KEY \
-e DJANGO_SUPERUSER_EMAIL=$DJANGO_SUPERUSER_EMAIL \
-e DJANGO_SUPERUSER_USERNAME=$DJANGO_SUPERUSER_USERNAME \
-e DJANGO_SUPERUSER_PASSWORD=$DJANGO_SUPERUSER_PASSWORD \
-e DOMAIN=$DOMAIN \
-e REDIS_HOST=amuman-redis-staging \
-e SHARED_FOLDER=/mnt/smb \
amuman-manager-staging

node:
Expand All @@ -42,9 +44,11 @@ node:
podman run --rm -it --replace --tz local --pull newer \
--name amuman-node-staging \
--device=nvidia.com/gpu=all \
-v ./mock_nas:/mnt/smb \
-v ./mock_nas:/shared \
-v ./staging/node_config:/config \
-e MANAGER_DOMAIN=$DOMAIN \
-e NODE_NAME=staging-node-1 \
-e SHARED_FOLDER=/shared \
amuman-node-staging

staging: frontend redis manager proxy
Expand Down
6 changes: 3 additions & 3 deletions manager/amuman/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@
"manager.middleware.scheduler_middleware.SchedulerMiddleware",
]
if DEBUG:
MIDDLEWARE.append(
"manager.middleware.generate_initial_data.GenerateRandomJobsMiddleware"
)
MIDDLEWARE.append("manager.middleware.generate_initial_data_debug.Generate")
else:
MIDDLEWARE.append("manager.middleware.generate_initial_data_prod.Generate")


ROOT_URLCONF = "amuman.urls"
Expand Down
6 changes: 5 additions & 1 deletion manager/manager/components/check_mx3_file.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging
import os
from pathlib import Path

log = logging.getLogger("rich")

SHARED_FOLDER = Path(os.environ.get("SHARED_FOLDER", "/mnt/smb"))


def validate_mx3_file(path_str: str) -> bool:
path = Path(path_str)
path = SHARED_FOLDER / Path(path_str)

if not path.exists():
log.error(f"File does not exist: {path}")
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
log = logging.getLogger("rich")


class GenerateRandomJobsMiddleware:
class Generate:
def __init__(self, get_response):
self.get_response = get_response
self.generate_users()
self.generate_random_jobs()
raise MiddlewareNotUsed(
"GenerateRandomJobsMiddleware is disabled after initial use."
)
raise MiddlewareNotUsed("Generate is disabled after initial use.")

def generate_users(self):
if not CustomUser.objects.exists():
Expand Down
25 changes: 25 additions & 0 deletions manager/manager/middleware/generate_initial_data_prod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging

from django.contrib.auth.models import User as AuthUser
from django.core.exceptions import MiddlewareNotUsed

from manager.models import CustomUser

log = logging.getLogger("rich")


class Generate:
def __init__(self, get_response):
self.get_response = get_response
self.generate_users()
raise MiddlewareNotUsed("Generate is disabled after initial use.")

def generate_users(self):
if not CustomUser.objects.exists():
admin = AuthUser.objects.get(username="admin")
self.admin_user = CustomUser(auth=admin, concurrent_jobs=20)
self.admin_user.save()

def __call__(self, request):
response = self.get_response(request)
return response
2 changes: 1 addition & 1 deletion manager/manager/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def create(self, validated_data):
"username": validated_data.pop("username"),
"password": validated_data.pop("password"),
"email": validated_data.pop("email"),
"is_active": False,
"is_active": True,
}
# Check if username already exists
if User.objects.filter(username=user_data["username"]).exists():
Expand Down
2 changes: 1 addition & 1 deletion node/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ COPY . .
RUN pip install .
ENV SMB_MOUNT_POINT=/mnt/smb

ENTRYPOINT ["/app/entrypoint.sh"]
CMD amuman-node
69 changes: 58 additions & 11 deletions node/amuman_node/api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
import os
from typing import Any, Dict

import requests

from amuman_node.config import Config
from amuman_node.job import Job

log = logging.getLogger("rich")
Expand All @@ -12,23 +12,50 @@


class API:
def __init__(self):
self.url = f"{os.environ['MANAGER_URL']}/api"
self.node_user: str = os.environ["NODE_NAME"]
self.node_password: str = os.environ["NODE_PASSWORD"]
def __init__(self, config: Config):
self.config = config
self.url = f"https://{config.manager_domain}/api"
log.debug(f"API URL: {self.url}")
self.access_token = None
self.refresh_token = None
self.headers = {}

def create_user_if_doesnt_exist(self) -> None:
log.debug("Checking if node user exists...")
users = self.get_users().json() # ["results"]
log.debug(f"Users: {users}")
node_user_exists = any(
user["auth"]["username"] == self.config.name for user in users
)
if not node_user_exists:
log.debug("Node user does not exist. Creating...")
self.post_user()
else:
log.debug("Node user exists.")

def authenticate(self) -> bool:
# self.create_user_if_doesnt_exist()
try:
res = self.post_user()
if res.status_code == 201:
log.debug("Node user created successfully")
else:
log.debug(
f"Node user creation failed: {res.status_code=}, {res.json()=}"
)
except requests.exceptions.RequestException as e:
log.exception(f"Error creating the node user: {e}")

log.debug("Authenticating...")
try:
data = {
"username": self.config.name,
"password": self.config.password,
}
log.debug(f"Authenticating with {self.url}/token/ {data=}")
response = requests.post(
self.url + "/token/",
json={
"username": self.node_user,
"password": self.node_password,
},
json=data,
)
log.debug(
f"Authentication response: {response.status_code=}, {response.json()=}"
Expand All @@ -45,7 +72,27 @@ def authenticate(self) -> bool:
log.error("Unable to authenticate with the manager")
return False

def register(self, data: Data) -> requests.Response:
def get_users(self) -> requests.Response:
res = requests.get(
self.url + "/users/",
headers=self.headers,
)
return res

def post_user(self) -> requests.Response:
data = {
"username": self.config.name,
"password": self.config.password,
"email": f"{self.config.name}@localhost",
}
res = requests.post(
self.url + "/users/",
headers=self.headers,
json=data,
)
return res

def post_node(self, data: Data) -> requests.Response:
res = requests.post(
self.url + "/nodes/",
headers=self.headers,
Expand All @@ -61,7 +108,7 @@ def post_gpu(self, data: Data) -> requests.Response:
)
return res

def update_job(self, job: Job) -> requests.Response:
def put_job(self, job: Job) -> requests.Response:
res = requests.put(
self.url + f"/jobs/{job.id}/",
headers=self.headers,
Expand Down
46 changes: 46 additions & 0 deletions node/amuman_node/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
import logging
import os
import random
from pathlib import Path

log = logging.getLogger("rich")


class Config:
def __init__(self):
self.name: str
self.password: str
self.manager_domain: str
self.read_config()

def read_config(self):
path = Path("/config/config.json")
if path.exists():
with open(path) as f:
data = json.load(f)
self.name = data.get("name")
self.password = data.get("password")
self.manager_domain = data.get("manager_domain")
log.debug(
f"Config read from file: {self.name=}, {self.password=}, {self.manager_domain=}"
)

self.name = os.getenv("NODE_NAME", os.getenv("HOST", str(int(1e12))))
if self.password is None:
self.password = str(random.randint(0, int(1e12)))
self.manager_domain: str = os.getenv("MANAGER_DOMAIN", "localhost")
self.write_config()
log.debug(f"Config: {self.name=}, {self.password=}, {self.manager_domain=}")

def write_config(self):
path = Path("/config/config.json")
config = {
"name": self.name,
"password": self.password,
"manager_domain": self.manager_domain,
}
# create the directory if it doesn't exist
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(config, f, indent=4)
30 changes: 21 additions & 9 deletions node/amuman_node/job_manager.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
import asyncio
import logging
import os
from datetime import datetime
from pathlib import Path

from amuman_node.api import API

from .job import Job, JobStatus

log = logging.getLogger("rich")

SHARED_FOLDER = Path(os.environ.get("SHARED_FOLDER", "/mnt/smb"))


class JobRunner:
def __init__(self, node_id: int, api: API, job_id: int) -> None:
def __init__(self, node_id: int, api: API, job_id: int, gpu_device_id: int) -> None:
self.node_id: int = node_id
self.api: API = api
self.subprocess: asyncio.subprocess.Process
self.job: Job = self.api.get_job(job_id)
self.gpu_device_id = gpu_device_id
self.async_task = asyncio.create_task(self.run_subprocess())

# TODO: Check if interrupted
# job.end_time = datetime.now().isoformat()
# job.status = "Finished"

async def run_subprocess(self) -> None:
cmd: list[str] = ["amumax", "-gpu=1", "-magnets=false", self.job.path]
cmd: list[str] = [
"amumax",
f"-gpu={self.gpu_device_id}",
"-magnets=false",
str(SHARED_FOLDER / Path(self.job.path)),
]
log.debug(f"Starting subprocess for job ID: {self.job.id} with command: {cmd}")

try:
Expand Down Expand Up @@ -62,14 +72,16 @@ async def _handle_completion(self) -> None:
log.debug(f"AMUmax exited with status {self.job.status.name}.")
self.job.end_time = datetime.now().isoformat()
try:
res = self.api.update_job(self.job)
res = self.api.put_job(self.job)
if res.status_code not in [200, 201]:
log.error(
f"Failed to update job ID: {self.job.id}. Status Code: {res.status_code}. Response: {res.json()}"
)
return
except Exception as e:
log.error(f"Failed to update job ID: {self.job.id}. Error: {e}")
return
log.debug(
f"Job ID: {self.job.id}(completed) updated with status: {self.job.status.name}."
)
log.debug(f"Response: {res.json()}")
log.debug(f"Job ID: {self.job.id} updated with status: {self.job.status.name}.")

async def _handle_error(self, error: Exception) -> None:
error_message: str = ""
Expand All @@ -87,7 +99,7 @@ async def _handle_error(self, error: Exception) -> None:
log.error(error_message)
self.job.status = JobStatus.INTERRUPTED
self.job.error = error_message
res = self.api.update_job(self.job)
res = self.api.put_job(self.job)
log.debug(
f"Job ID: {self.job.id}(error) updated with status: {self.job.status.name}. Response: {res.json()}"
)
Expand All @@ -97,4 +109,4 @@ async def stop_process(self) -> None:
log.debug(f"Stopping amumax for job ID: {self.job.id}")
self.subprocess.terminate()
self.job.status = JobStatus.INTERRUPTED
self.api.update_job(self.job)
self.api.put_job(self.job)
Loading

0 comments on commit 70a07d5

Please sign in to comment.