From aebcbbb2814ed9ef26d2e6f003cc9ffe8e73a0e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Enrique=20Tom=C3=A1s?= <64653672+enriquetomasmb@users.noreply.github.com> Date: Mon, 16 Dec 2024 11:32:33 +0100 Subject: [PATCH] [Feature/simultaneous_queues] Improved version for scenario management (#29) * feature: multiple queues working with different users using the default scenario configuration * fix network prefix 192.168.1 changed to 192.168.2 * Fix different queue for each user * feature: show scenarios in the dashboard depending on the role of the user * refactor: changed location of utils.py * fix: nebula-frontend port * fix: launching scenarios with same user in different frontend instance * fix: launching scenarios with same user in different frontend instance * fix: stopping only instance * fix: stopping only instance * fix: remove instance network * fix: killing other instance docker when starting new scenario * fix: stop bugs * feature: display queues for each user * Fix persistence on session and dbs * Fix persistence on session * fix: scenario database, database dashboard * Minor changes in user management * Improve update docker images * remove reconnections (more debug needed) * fix: scenario title setted to empty when theres no scenario title * fix: redirect to dashboard.py * fix: waf containers on production mode working * fix: change user in config files automatically * fix: remove unnecesary variable user * remove unused functions * fix: blockchain network endpoint * fix: deploying different scenarios with process * define NEBULA Controller API, update makefile * enable controller endpoint from the frontend * fix: connection between nodes in process deployment * refactor: fronted.log in run_frontend for frontend.log * feature: halt scenarios when RAM or GPU usage is maxed out * fix: obtain list of gpus available based in user * fix: gpus list * fix: docker ips and gpus lists * Fix generate statistics --------- Co-authored-by: FerTV --- Makefile | 79 +- app/main.py | 33 +- docs/_prebuilt/installation.md | 2 +- nebula/addons/waf/Dockerfile-grafana | 5 + nebula/addons/waf/Dockerfile-loki | 2 + nebula/addons/waf/Dockerfile-promtail | 7 + nebula/addons/waf/Dockerfile-waf | 6 + nebula/addons/waf/default.conf | 2 +- nebula/addons/waf/grafana/automatic.yml | 2 +- nebula/addons/waf/promtail-config.yml | 2 +- nebula/controller.py | 735 +++++++++--------- nebula/core/network/connection.py | 4 +- nebula/core/training/lightning.py | 11 +- nebula/frontend/app.py | 375 ++++++--- .../frontend/config/participant.json.example | 1 + nebula/frontend/database.py | 228 ++++-- nebula/frontend/templates/admin.html | 52 +- nebula/frontend/templates/dashboard.html | 49 +- nebula/frontend/templates/deployment.html | 16 +- nebula/frontend/utils.py | 15 - nebula/scenarios.py | 377 +++------ nebula/utils.py | 156 ++++ pyproject.toml | 21 + 23 files changed, 1241 insertions(+), 939 deletions(-) delete mode 100644 nebula/frontend/utils.py create mode 100644 nebula/utils.py diff --git a/Makefile b/Makefile index 817730bc..7d3d074e 100644 --- a/Makefile +++ b/Makefile @@ -3,13 +3,6 @@ PYTHON_VERSION := 3.11 UV_INSTALL_SCRIPT := https://astral.sh/uv/install.sh PATH := $(HOME)/.local/bin:$(PATH) -command_exists = $(shell command -v $(1) >/dev/null 2>&1 && echo true || echo false) - -define install_uv - @echo "πŸ“¦ uv is not installed. Installing uv..." - @curl -LsSf $(UV_INSTALL_SCRIPT) | sh -endef - .PHONY: check-uv check-uv: ## Check and install uv if necessary @if command -v $(UV) >/dev/null 2>&1; then \ @@ -33,33 +26,26 @@ install-python: check-uv ## Install Python with uv .PHONY: install install: install-python ## Install core dependencies @echo "πŸ“¦ Installing core dependencies with uv" - @$(UV) sync --group core + @$(UV) sync --group controller --group core @echo "πŸ”§ Installing pre-commit hooks" @$(UV) run pre-commit install @echo "" - @echo "🐳 Building nebula-frontend docker image. Do you want to continue (overrides existing image)? (y/n)" - @read ans; if [ "$${ans:-N}" = y ]; then \ - docker build -t nebula-frontend -f nebula/frontend/Dockerfile .; \ - else \ - echo "Skipping nebula-frontend docker build."; \ - fi - @echo "" - @echo "🐳 Building nebula-core docker image. Do you want to continue? (overrides existing image)? (y/n)" - @read ans; if [ "$${ans:-N}" = y ]; then \ - docker build -t nebula-core .; \ - else \ - echo "Skipping nebula-core docker build."; \ - fi + @$(MAKE) update @echo "" @$(MAKE) shell -.PHONY: full-install -full-install: install-python ## Install all dependencies (core, docs) - @echo "πŸ“¦ Installing all dependencies with uv" - @$(UV) sync --group core --group docs - @echo "πŸ”§ Installing pre-commit hooks" - @$(UV) run pre-commit install - @$(MAKE) shell +.PHONY: install-production +install-production: install ## Install production dependencies + @echo "🐳 Updating production docker images..." + @echo "🐳 Building nebula-waf" + @docker build -t nebula-waf -f nebula/addons/waf/Dockerfile-waf --build-arg USER=$(USER) nebula/addons/waf + @echo "🐳 Building nebula-loki" + @docker build -t nebula-waf-loki -f nebula/addons/waf/Dockerfile-loki nebula/addons/waf + @echo "🐳 Building nebula-promtail" + @docker build -t nebula-waf-promtail -f nebula/addons/waf/Dockerfile-promtail --build-arg USER=$(USER) nebula/addons/waf + @echo "🐳 Building nebula-grafana" + @docker build -t nebula-waf-grafana -f nebula/addons/waf/Dockerfile-grafana --build-arg USER=$(USER) nebula/addons/waf + echo "🐳 Docker images updated." .PHONY: shell shell: ## Start a shell in the uv environment @@ -79,6 +65,24 @@ shell: ## Start a shell in the uv environment echo "πŸš€ Created by \033[1;34mEnrique TomΓ‘s MartΓ­nez BeltrΓ‘n\033[0m <\033[1;34menriquetomas@um.es\033[0m>"; \ fi +.PHONY: update +update: ## Update docker images + @echo "🐳 Updating docker images..." + @echo "🐳 Building nebula-frontend docker image. Do you want to continue (overrides existing image)? (y/n)" + @read ans; if [ "$${ans:-N}" = y ]; then \ + docker build -t nebula-frontend -f nebula/frontend/Dockerfile .; \ + else \ + echo "Skipping nebula-frontend docker build."; \ + fi + @echo "" + @echo "🐳 Building nebula-core docker image. Do you want to continue? (overrides existing image)? (y/n)" + @read ans; if [ "$${ans:-N}" = y ]; then \ + docker build -t nebula-core .; \ + else \ + echo "Skipping nebula-core docker build."; \ + fi + echo "🐳 Docker images updated." + .PHONY: lock lock: ## Update the lock file @echo "πŸ”’ This will update the lock file. Do you want to continue? (y/n)" @@ -86,13 +90,6 @@ lock: ## Update the lock file @echo "πŸ”’ Locking dependencies..." @$(UV) lock -.PHONY: update-libs -update-libs: ## Update libraries to the latest version - @echo "πŸ”§ This will override the versions of current libraries. Do you want to continue? (y/n)" - @read ans && [ $${ans:-N} = y ] || { echo "Update cancelled."; exit 1; } - @echo "πŸ“¦ Updating libraries..." - @$(UV) update - .PHONY: check check: ## Run code quality tools @echo "πŸ› οΈ Running code quality checks" @@ -127,6 +124,12 @@ publish: ## Publish a release to PyPI .PHONY: build-and-publish build-and-publish: build publish ## Build and publish the package +.PHONY: doc-install +full-install: install-python ## Install dependencies for documentation + @echo "πŸ“¦ Installing doc dependencies with uv" + @$(UV) sync --group core --group docs + @$(MAKE) shell + .PHONY: doc-test doc-test: ## Test if documentation can be built without errors @$(UV) run mkdocs build -f docs/mkdocs.yml -d _build -s @@ -139,12 +142,6 @@ doc-build: ## Build the documentation doc-serve: ## Serve the documentation locally @$(UV) run mkdocs serve -f docs/mkdocs.yml -.PHONY: format -format: ## Format code with black and isort - @echo "🎨 Formatting code" - @$(UV) run black . - @$(UV) run isort . - .PHONY: clean clean: clean-build ## Clean up build artifacts and caches @echo "🧹 Cleaning up build artifacts and caches" diff --git a/app/main.py b/app/main.py index e601a638..c2acdd9a 100755 --- a/app/main.py +++ b/app/main.py @@ -9,6 +9,14 @@ argparser = argparse.ArgumentParser(description="Controller of NEBULA platform", add_help=False) +argparser.add_argument( + "-cp", + "--controllerport", + dest="controllerport", + default=5000, + help="Controller port (default: 5000)", +) + argparser.add_argument( "--grafanaport", dest="grafanaport", @@ -38,6 +46,14 @@ help="Frontend port (default: 6060)", ) +argparser.add_argument( + "-sp", + "--statsport", + dest="statsport", + default=8080, + help="Statistics port (default: 8080)", +) + argparser.add_argument("-t", "--test", dest="test", action="store_true", default=False, help="Run tests") argparser.add_argument( @@ -49,14 +65,9 @@ default=None, help="Stop NEBULA platform or nodes only (use '--stop nodes' to stop only the nodes)", ) -argparser.add_argument( - "-sp", - "--statsport", - dest="statsport", - default=8080, - help="Statistics port (default: 8080)", -) + argparser.add_argument("-s", "--simulation", action="store_false", dest="simulation", help="Run simulation") + argparser.add_argument( "-c", "--config", @@ -64,6 +75,7 @@ default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "config"), help="Config directory path", ) + argparser.add_argument( "-l", "--logs", @@ -71,6 +83,7 @@ default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs"), help="Logs directory path", ) + argparser.add_argument( "-ce", "--certs", @@ -78,6 +91,7 @@ default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "certs"), help="Certs directory path", ) + argparser.add_argument( "-e", "--env", @@ -85,6 +99,7 @@ default=os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env"), help=".env file path", ) + argparser.add_argument( "-p", "--production", @@ -93,6 +108,7 @@ default=False, help="Production mode", ) + argparser.add_argument( "-ad", "--advanced", @@ -101,6 +117,7 @@ default=False, help="Advanced analytics", ) + argparser.add_argument( "-v", "--version", @@ -108,6 +125,7 @@ version="%(prog)s " + nebula.__version__, help="Show version", ) + argparser.add_argument( "-a", "--about", @@ -115,6 +133,7 @@ version="Created by Enrique TomΓ‘s MartΓ­nez BeltrΓ‘n", help="Show author", ) + argparser.add_argument("-h", "--help", action="help", default=argparse.SUPPRESS, help="Show help") args = argparser.parse_args() diff --git a/docs/_prebuilt/installation.md b/docs/_prebuilt/installation.md index 2b29572c..28c86b03 100644 --- a/docs/_prebuilt/installation.md +++ b/docs/_prebuilt/installation.md @@ -125,7 +125,7 @@ frontend, controller, and nodes. ## Possible issues during the installation or execution -If frontend is not working, check the logs in app/logs/server.log +If frontend is not working, check the logs in app/logs/frontend.log If any of the following errors appear, take a look at the docker logs of the nebula-frontend container: diff --git a/nebula/addons/waf/Dockerfile-grafana b/nebula/addons/waf/Dockerfile-grafana index 068e7a81..cb710c51 100755 --- a/nebula/addons/waf/Dockerfile-grafana +++ b/nebula/addons/waf/Dockerfile-grafana @@ -1,5 +1,10 @@ FROM grafana/grafana:latest +ARG USER +ENV USER=${USER} + COPY ./grafana/dashboard_config.yml /etc/grafana/provisioning/dashboards/local.yml COPY ./grafana/automatic.yml /etc/grafana/provisioning/datasources/automatic.yml COPY ./grafana/dashboard.json /var/lib/grafana/dashboards/dashboard.json + +RUN sed -i "s|http://nebula|http://$USER|g" /etc/grafana/provisioning/datasources/automatic.yml diff --git a/nebula/addons/waf/Dockerfile-loki b/nebula/addons/waf/Dockerfile-loki index b0469ede..39ffaea7 100755 --- a/nebula/addons/waf/Dockerfile-loki +++ b/nebula/addons/waf/Dockerfile-loki @@ -1 +1,3 @@ FROM grafana/loki:latest + +COPY loki-config.yml /mnt/config/loki-config.yml diff --git a/nebula/addons/waf/Dockerfile-promtail b/nebula/addons/waf/Dockerfile-promtail index 4d6b787a..a5214e92 100755 --- a/nebula/addons/waf/Dockerfile-promtail +++ b/nebula/addons/waf/Dockerfile-promtail @@ -1 +1,8 @@ FROM grafana/promtail:latest + +ARG USER +ENV USER=${USER} + +COPY promtail-config.yml /etc/promtail/config.yml + +RUN sed -i "s|http://nebula|http://$USER|g" /etc/promtail/config.yml diff --git a/nebula/addons/waf/Dockerfile-waf b/nebula/addons/waf/Dockerfile-waf index 91d7fcd3..0af60bdc 100755 --- a/nebula/addons/waf/Dockerfile-waf +++ b/nebula/addons/waf/Dockerfile-waf @@ -3,6 +3,9 @@ FROM owasp/modsecurity-crs:3.3.5-nginx-202310170110 ARG NGINX_VERSION=1.24.0 +ARG USER +ENV USER=${USER} + # Installed necessary packages RUN apt-get update && apt-get install -y libmaxminddb0 libmaxminddb-dev mmdb-bin git wget RUN apt install -y build-essential libpcre3 libpcre3-dev zlib1g zlib1g-dev libssl-dev @@ -37,6 +40,9 @@ RUN wget https://git.io/GeoLite2-Country.mmdb -P /usr/share/GeoIP/ # nginx configuration files COPY default.conf /etc/nginx/templates/conf.d/default.conf.template + +RUN sed -i "s|http://nebula|http://${USER}|g" /etc/nginx/templates/conf.d/default.conf.template + COPY nginx.conf /etc/nginx/templates/nginx.conf.template # owasp crs diff --git a/nebula/addons/waf/default.conf b/nebula/addons/waf/default.conf index 135e70bb..e2ee5892 100755 --- a/nebula/addons/waf/default.conf +++ b/nebula/addons/waf/default.conf @@ -12,7 +12,7 @@ server { listen 80 default_server; server_name localhost; - set $upstream http://nebula-frontend; # Change this + set $upstream http://nebula-nebula-frontend; # Change this set $always_redirect off; modsecurity on; location /nebula { diff --git a/nebula/addons/waf/grafana/automatic.yml b/nebula/addons/waf/grafana/automatic.yml index 9417d65f..69f3b03d 100755 --- a/nebula/addons/waf/grafana/automatic.yml +++ b/nebula/addons/waf/grafana/automatic.yml @@ -1,6 +1,6 @@ datasources: - name: Loki type: loki - url: http://loki:3100 + url: http://nebula-nebula-waf-loki:3100 isDefault: true editable: true diff --git a/nebula/addons/waf/promtail-config.yml b/nebula/addons/waf/promtail-config.yml index cadcc583..e0b130fc 100755 --- a/nebula/addons/waf/promtail-config.yml +++ b/nebula/addons/waf/promtail-config.yml @@ -6,7 +6,7 @@ positions: filename: /tmp/positions.yaml clients: - - url: http://loki:3100/loki/api/v1/push + - url: http://nebula-nebula-waf-loki:3100/loki/api/v1/push scrape_configs: - job_name: nginx diff --git a/nebula/controller.py b/nebula/controller.py index eb0356b4..96b7e8c6 100755 --- a/nebula/controller.py +++ b/nebula/controller.py @@ -1,15 +1,21 @@ +import asyncio +import importlib +import json import logging import os import re import signal import subprocess import sys -import textwrap import threading import time +import docker import psutil +import torch +import uvicorn from dotenv import load_dotenv +from fastapi import FastAPI from watchdog.events import PatternMatchingEventHandler from watchdog.observers import Observer @@ -18,6 +24,7 @@ from nebula.config.mender import Mender from nebula.scenarios import ScenarioManagement from nebula.tests import main as deploy_tests +from nebula.utils import DockerUtils, SocketUtils # Setup controller logger @@ -31,27 +38,106 @@ def format(self, record): return super().format(record) -log_console_format = "[%(levelname)s] - %(asctime)s - Controller - %(message)s" -console_handler = logging.StreamHandler() -console_handler.setLevel(logging.INFO) -# console_handler.setFormatter(logging.Formatter(log_console_format)) -console_handler.setFormatter(TermEscapeCodeFormatter(log_console_format)) -logging.basicConfig( - level=logging.DEBUG, - handlers=[ - console_handler, - ], -) +# Initialize FastAPI app outside the Controller class +app = FastAPI() -# Detect ctrl+c and run killports -def signal_handler(sig, frame): - Controller.stop() - sys.exit(0) +# Define endpoints outside the Controller class +@app.get("/") +async def read_root(): + return {"message": "Welcome to the NEBULA Controller API"} -signal.signal(signal.SIGTERM, signal_handler) -signal.signal(signal.SIGINT, signal_handler) +@app.get("/status") +async def get_status(): + return {"status": "NEBULA Controller API is running"} + +@app.get("/resources") +async def get_resources(): + devices = 0 + gpu_memory_percent = [] + + # Obtain available RAM + memory_info = await asyncio.to_thread(psutil.virtual_memory) + + if importlib.util.find_spec("pynvml") is not None: + try: + import pynvml + await asyncio.to_thread(pynvml.nvmlInit) + devices = await asyncio.to_thread(pynvml.nvmlDeviceGetCount) + + # Obtain GPU info + for i in range(devices): + handle = await asyncio.to_thread(pynvml.nvmlDeviceGetHandleByIndex, i) + memory_info_gpu = await asyncio.to_thread(pynvml.nvmlDeviceGetMemoryInfo, handle) + memory_used_percent = (memory_info_gpu.used / memory_info_gpu.total) * 100 + gpu_memory_percent.append(memory_used_percent) + + except Exception: # noqa: S110 + pass + + return { + # "cpu_percent": psutil.cpu_percent(), + "gpus" : devices, + "memory_percent" : memory_info.percent, + "gpu_memory_percent": gpu_memory_percent, + } + + +@app.get("/least_memory_gpu") +async def get_least_memory_gpu(): + gpu_with_least_memory_index = None + + if importlib.util.find_spec("pynvml") is not None: + try: + import pynvml + await asyncio.to_thread(pynvml.nvmlInit) + devices = await asyncio.to_thread(pynvml.nvmlDeviceGetCount) + + # Obtain GPU info + for i in range(devices): + handle = await asyncio.to_thread(pynvml.nvmlDeviceGetHandleByIndex, i) + memory_info = await asyncio.to_thread(pynvml.nvmlDeviceGetMemoryInfo, handle) + memory_used_percent = (memory_info.used / memory_info.total) * 100 + + # Obtain GPU with less memory available + if memory_used_percent > max_memory_used_percent: + max_memory_used_percent = memory_used_percent + gpu_with_least_memory_index = i + + except Exception: # noqa: S110 + pass + + return { + "gpu_with_least_memory_index": gpu_with_least_memory_index, + } + + +@app.get("/available_gpus/") +async def get_available_gpu(): + available_gpus = [] + + if importlib.util.find_spec("pynvml") is not None: + try: + import pynvml + await asyncio.to_thread(pynvml.nvmlInit) + devices = await asyncio.to_thread(pynvml.nvmlDeviceGetCount) + + # Obtain GPU info + for i in range(devices): + handle = await asyncio.to_thread(pynvml.nvmlDeviceGetHandleByIndex, i) + memory_info = await asyncio.to_thread(pynvml.nvmlDeviceGetMemoryInfo, handle) + memory_used_percent = (memory_info.used / memory_info.total) * 100 + + # Obtain available GPUs + if memory_used_percent < 5: + available_gpus.append(i) + + return { + "available_gpus": available_gpus, + } + except Exception: # noqa: S110 + pass class NebulaEventHandler(PatternMatchingEventHandler): @@ -96,6 +182,47 @@ def _processing_done(self, src_path: str): if src_path in self.processing_files: self.processing_files.remove(src_path) + def verify_nodes_ports(self, src_path): + parent_dir = os.path.dirname(src_path) + base_dir = os.path.basename(parent_dir) + scenario_path = os.path.join(os.path.dirname(parent_dir), base_dir) + + try: + port_mapping = {} + new_port_start = 50001 + for filename in os.listdir(scenario_path): + if filename.endswith(".json") and filename.startswith("participant"): + file_path = os.path.join(scenario_path, filename) + + with open(file_path) as json_file: + node = json.load(json_file) + + current_port = node["network_args"]["port"] + port_mapping[current_port] = SocketUtils.find_free_port(start_port=new_port_start) + new_port_start = port_mapping[current_port] + 1 + + for filename in os.listdir(scenario_path): + if filename.endswith(".json") and filename.startswith("participant"): + file_path = os.path.join(scenario_path, filename) + + with open(file_path) as json_file: + node = json.load(json_file) + + current_port = node["network_args"]["port"] + node["network_args"]["port"] = port_mapping[current_port] + neighbors = node["network_args"]["neighbors"] + + for old_port, new_port in port_mapping.items(): + neighbors = neighbors.replace(f":{old_port}", f":{new_port}") + + node["network_args"]["neighbors"] = neighbors + + with open(file_path, "w") as f: + json.dump(node, f, indent=4) + + except Exception as e: + print(f"Error processing JSON files: {e}") + def on_created(self, event): """ Handles the event when a file is created. @@ -109,6 +236,7 @@ def on_created(self, event): return logging.info("File created: %s" % src_path) try: + self.verify_nodes_ports(src_path) self.run_script(src_path) finally: self._processing_done(src_path) @@ -203,6 +331,7 @@ def __init__(self, args): self.start_date_scenario = None self.federation = args.federation if hasattr(args, "federation") else None self.topology = args.topology if hasattr(args, "topology") else None + self.controller_port = args.controllerport if hasattr(args, "controllerport") else 5000 self.waf_port = args.wafport if hasattr(args, "wafport") else 6000 self.frontend_port = args.webport if hasattr(args, "webport") else 6060 self.grafana_port = args.grafanaport if hasattr(args, "grafanaport") else 6040 @@ -228,6 +357,19 @@ def __init__(self, args): self.network_subnet = args.network_subnet if hasattr(args, "network_subnet") else None self.network_gateway = args.network_gateway if hasattr(args, "network_gateway") else None + # Configure logger + self.configure_logger() + + # Check ports available + if not SocketUtils.is_port_open(self.controller_port): + self.controller_port = SocketUtils.find_free_port() + + if not SocketUtils.is_port_open(self.frontend_port): + self.frontend_port = SocketUtils.find_free_port() + + if not SocketUtils.is_port_open(self.statistics_port): + self.statistics_port = SocketUtils.find_free_port(self.frontend_port + 1) + self.config = Config(entity="controller") self.topologymanager = None self.n_nodes = 0 @@ -235,6 +377,33 @@ def __init__(self, args): self.use_blockchain = args.use_blockchain if hasattr(args, "use_blockchain") else False self.gpu_available = False + # Reference the global app instance + self.app = app + + def configure_logger(self): + log_console_format = "[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s" + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter(TermEscapeCodeFormatter(log_console_format)) + console_handler_file = logging.FileHandler(os.path.join(self.log_dir, "controller.log"), mode="a") + console_handler_file.setLevel(logging.INFO) + console_handler_file.setFormatter(logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s")) + logging.basicConfig( + level=logging.DEBUG, + handlers=[ + console_handler, + console_handler_file, + ], + ) + uvicorn_loggers = ["uvicorn", "uvicorn.error", "uvicorn.access"] + for logger_name in uvicorn_loggers: + logger = logging.getLogger(logger_name) + logger.handlers = [] # Remove existing handlers + logger.propagate = False # Prevent duplicate logs + handler = logging.FileHandler(os.path.join(self.log_dir, "controller.log"), mode="a") + handler.setFormatter(logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s")) + logger.addHandler(handler) + def start(self): banner = """ β–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•— β–ˆβ–ˆβ•—β–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— @@ -269,6 +438,11 @@ def start(self): os.environ["NEBULA_ROOT_HOST"] = self.root_path os.environ["NEBULA_HOST_PLATFORM"] = self.host_platform + # Start the FastAPI app in a daemon thread + app_thread = threading.Thread(target=self.run_controller_api, daemon=True) + app_thread.start() + logging.info(f"NEBULA Controller is running at port {self.controller_port}") + if self.production: self.run_waf() logging.info(f"NEBULA WAF is running at port {self.waf_port}") @@ -278,12 +452,12 @@ def start(self): self.run_test() else: self.run_frontend() - logging.info(f"NEBULA Frontend is running at port {self.frontend_port}") + logging.info(f"NEBULA Frontend is running at http://localhost:{self.frontend_port}") # Watchdog for running additional scripts in the host machine (i.e. during the execution of a federation) event_handler = NebulaEventHandler() observer = Observer() - observer.schedule(event_handler, path=self.config_dir, recursive=False) + observer.schedule(event_handler, path=self.config_dir, recursive=True) observer.start() if self.mender: @@ -312,6 +486,11 @@ def start(self): sys.exit(0) logging.info("Press Ctrl+C for exit from NEBULA (global exit)") + + # Adjust signal handling inside the start method + signal.signal(signal.SIGTERM, self.signal_handler) + signal.signal(signal.SIGINT, self.signal_handler) + try: while True: time.sleep(1) @@ -322,172 +501,132 @@ def start(self): observer.join() - def run_waf(self): - docker_compose_template = textwrap.dedent( - """ - services: - {} - """ - ) + def signal_handler(self, sig, frame): + # Handle termination signals + logging.info("Received termination signal, shutting down...") + self.stop() + sys.exit(0) - waf_template = textwrap.dedent( - """ - nebula-waf: - container_name: nebula-waf - image: nebula-waf - build: - context: . - dockerfile: Dockerfile-waf - restart: unless-stopped - volumes: - - {log_path}/waf/nginx:/var/log/nginx - extra_hosts: - - "host.docker.internal:host-gateway" - ipc: host - privileged: true - ports: - - {waf_port}:80 - networks: - nebula-net-base: - ipv4_address: {ip} - """ + def run_controller_api(self): + uvicorn.run( + self.app, + host="0.0.0.0", + port=self.controller_port, + log_config=None, # Prevent Uvicorn from configuring logging ) - grafana_template = textwrap.dedent( - """ - grafana: - container_name: nebula-waf-grafana - image: nebula-waf-grafana - build: - context: . - dockerfile: Dockerfile-grafana - restart: unless-stopped - environment: - - GF_SECURITY_ADMIN_PASSWORD=admin - - GF_USERS_ALLOW_SIGN_UP=false - - GF_SERVER_HTTP_PORT=3000 - - GF_SERVER_PROTOCOL=http - - GF_SERVER_DOMAIN=localhost:{grafana_port} - - GF_SERVER_ROOT_URL=http://localhost:{grafana_port}/grafana/ - - GF_SERVER_SERVE_FROM_SUB_PATH=true - - GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/dashboard.json - - GF_METRICS_MAX_LIMIT_TSDB=0 - ports: - - {grafana_port}:3000 - ipc: host - privileged: true - networks: - nebula-net-base: - ipv4_address: {ip} - """ + def run_waf(self): + network_name = f"{os.environ['USER']}-nebula-net-base" + base = DockerUtils.create_docker_network(network_name) + + client = docker.from_env() + + volumes_waf = ["/var/log/nginx"] + + ports_waf = [80] + + host_config_waf = client.api.create_host_config( + binds=[f"{os.environ['NEBULA_LOGS_DIR']}/waf/nginx:/var/log/nginx"], + privileged=True, + port_bindings={80: self.waf_port}, ) - loki_template = textwrap.dedent( - """ - loki: - container_name: nebula-waf-loki - image: nebula-waf-loki - build: - context: . - dockerfile: Dockerfile-loki - restart: unless-stopped - volumes: - - ./loki-config.yml:/mnt/config/loki-config.yml - ports: - - {loki_port}:3100 - user: "0:0" - command: - - '-config.file=/mnt/config/loki-config.yml' - networks: - nebula-net-base: - ipv4_address: {ip} - """ + networking_config_waf = client.api.create_networking_config({ + f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.200") + }) + + container_id_waf = client.api.create_container( + image="nebula-waf", + name=f"{os.environ['USER']}-nebula-waf", + detach=True, + volumes=volumes_waf, + host_config=host_config_waf, + networking_config=networking_config_waf, + ports=ports_waf, ) - promtail_template = textwrap.dedent( - """ - promtail: - container_name: nebula-waf-promtail - image: nebula-waf-promtail - build: - context: . - dockerfile: Dockerfile-promtail - restart: unless-stopped - volumes: - - {log_path}/waf/nginx:/var/log/nginx - - ./promtail-config.yml:/etc/promtail/config.yml - command: - - '-config.file=/etc/promtail/config.yml' - networks: - nebula-net-base: - ipv4_address: {ip} - """ + client.api.start(container_id_waf) + + environment = { + "GF_SECURITY_ADMIN_PASSWORD": "admin", + "GF_USERS_ALLOW_SIGN_UP": "false", + "GF_SERVER_HTTP_PORT": "3000", + "GF_SERVER_PROTOCOL": "http", + "GF_SERVER_DOMAIN": f"localhost:{self.grafana_port}", + "GF_SERVER_ROOT_URL": f"http://localhost:{self.grafana_port}/grafana/", + "GF_SERVER_SERVE_FROM_SUB_PATH": "true", + "GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH": "/var/lib/grafana/dashboards/dashboard.json", + "GF_METRICS_MAX_LIMIT_TSDB": "0", + } + + ports = [3000] + + host_config = client.api.create_host_config( + port_bindings={3000: self.grafana_port}, ) - waf_template = textwrap.indent(waf_template, " " * 4) - grafana_template = textwrap.indent(grafana_template, " " * 4) - loki_template = textwrap.indent(loki_template, " " * 4) - promtail_template = textwrap.indent(promtail_template, " " * 4) - - network_template = textwrap.dedent( - """ - networks: - nebula-net-base: - name: nebula-net-base - driver: bridge - ipam: - config: - - subnet: {} - gateway: {} - """ + networking_config = client.api.create_networking_config({ + f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.201") + }) + + container_id = client.api.create_container( + image="nebula-waf-grafana", + name=f"{os.environ['USER']}-nebula-waf-grafana", + detach=True, + environment=environment, + host_config=host_config, + networking_config=networking_config, + ports=ports, ) - # Generate the Docker Compose file dynamically - services = "" - services += waf_template.format( - path=self.root_path, - log_path=os.environ["NEBULA_LOGS_DIR"], - waf_port=self.waf_port, - gw="192.168.10.1", - ip="192.168.10.200", + client.api.start(container_id) + + command = ["-config.file=/mnt/config/loki-config.yml"] + + ports_loki = [3100] + + host_config_loki = client.api.create_host_config( + port_bindings={3100: self.loki_port}, ) - services += grafana_template.format( - log_path=os.environ["NEBULA_LOGS_DIR"], - grafana_port=self.grafana_port, - loki_port=self.loki_port, - ip="192.168.10.201", + networking_config_loki = client.api.create_networking_config({ + f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.202") + }) + + container_id_loki = client.api.create_container( + image="nebula-waf-loki", + name=f"{os.environ['USER']}-nebula-waf-loki", + detach=True, + command=command, + host_config=host_config_loki, + networking_config=networking_config_loki, + ports=ports_loki, ) - services += loki_template.format(loki_port=self.loki_port, ip="192.168.10.202") + client.api.start(container_id_loki) - services += promtail_template.format(log_path=os.environ["NEBULA_LOGS_DIR"], ip="192.168.10.203") + volumes_promtail = ["/var/log/nginx"] - docker_compose_file = docker_compose_template.format(services) - docker_compose_file += network_template.format("192.168.10.0/24", "192.168.10.1") + host_config_promtail = client.api.create_host_config( + binds=[ + f"{os.environ['NEBULA_LOGS_DIR']}/waf/nginx:/var/log/nginx", + ], + ) - # Write the Docker Compose file in waf directory - with open( - f"{os.path.join(os.environ['NEBULA_ROOT'], 'nebula', 'addons', 'waf', 'docker-compose.yml')}", - "w", - ) as f: - f.write(docker_compose_file) + networking_config_promtail = client.api.create_networking_config({ + f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.203") + }) + + container_id_promtail = client.api.create_container( + image="nebula-waf-promtail", + name=f"{os.environ['USER']}-nebula-waf-promtail", + detach=True, + volumes=volumes_promtail, + host_config=host_config_promtail, + networking_config=networking_config_promtail, + ) - # Start the Docker Compose file, catch error if any - try: - subprocess.check_call([ - "docker", - "compose", - "-f", - f"{os.path.join(os.environ['NEBULA_ROOT'], 'nebula', 'addons', 'waf', 'docker-compose.yml')}", - "up", - "--build", - "-d", - ]) - except subprocess.CalledProcessError: - raise Exception( - "Docker Compose failed to start, please check if Docker Compose is installed (https://docs.docker.com/compose/install/) and Docker Engine is running." - ) + client.api.start(container_id_promtail) def run_frontend(self): if sys.platform == "win32": @@ -501,232 +640,84 @@ def run_frontend(self): "/var/run/docker.sock not found, please check if Docker is running and Docker Compose is installed." ) - docker_compose_template = textwrap.dedent( - """ - services: - {} - """ - ) - - frontend_template = textwrap.dedent( - """ - nebula-frontend: - container_name: nebula-frontend - image: nebula-frontend - build: - context: {path} - dockerfile: nebula/frontend/Dockerfile - restart: unless-stopped - volumes: - - {path}:/nebula - - /var/run/docker.sock:/var/run/docker.sock - - ./config/nebula:/etc/nginx/sites-available/default - environment: - - NEBULA_PRODUCTION={production} - - NEBULA_GPU_AVAILABLE={gpu_available} - - NEBULA_ADVANCED_ANALYTICS={advanced_analytics} - - NEBULA_SERVER_LOG=/nebula/app/logs/server.log - - NEBULA_LOGS_DIR=/nebula/app/logs/ - - NEBULA_CONFIG_DIR=/nebula/app/config/ - - NEBULA_CERTS_DIR=/nebula/app/certs/ - - NEBULA_ENV_PATH=/nebula/app/.env - - NEBULA_ROOT_HOST={path} - - NEBULA_HOST_PLATFORM={platform} - - NEBULA_DEFAULT_USER=admin - - NEBULA_DEFAULT_PASSWORD=admin - - NEBULA_FRONTEND_PORT={frontend_port} - extra_hosts: - - "host.docker.internal:host-gateway" - ipc: host - privileged: true - ports: - - {frontend_port}:80 - - {statistics_port}:8080 - networks: - nebula-net-base: - ipv4_address: {ip} - """ - ) - frontend_template = textwrap.indent(frontend_template, " " * 4) - - network_template = textwrap.dedent( - """ - networks: - nebula-net-base: - name: nebula-net-base - driver: bridge - ipam: - config: - - subnet: {} - gateway: {} - """ - ) - - network_template_external = textwrap.dedent( - """ - networks: - nebula-net-base: - external: true - """ - ) - try: subprocess.check_call(["nvidia-smi"]) self.gpu_available = True except Exception: logging.info("No GPU available for the frontend, nodes will be deploy in CPU mode") - # Generate the Docker Compose file dynamically - services = "" - services += frontend_template.format( - production=self.production, - gpu_available=self.gpu_available, - advanced_analytics=self.advanced_analytics, - path=self.root_path, - platform=self.host_platform, - gw="192.168.10.1", - ip="192.168.10.100", - frontend_port=self.frontend_port, - statistics_port=self.statistics_port, + network_name = f"{os.environ['USER']}-nebula-net-base" + + # Create the Docker network + base = DockerUtils.create_docker_network(network_name) + + client = docker.from_env() + + environment = { + "NEBULA_CONTROLLER_NAME": os.environ["USER"], + "NEBULA_PRODUCTION": self.production, + "NEBULA_GPU_AVAILABLE": self.gpu_available, + "NEBULA_ADVANCED_ANALYTICS": self.advanced_analytics, + "NEBULA_FRONTEND_LOG": "/nebula/app/logs/frontend.log", + "NEBULA_LOGS_DIR": "/nebula/app/logs/", + "NEBULA_CONFIG_DIR": "/nebula/app/config/", + "NEBULA_CERTS_DIR": "/nebula/app/certs/", + "NEBULA_ENV_PATH": "/nebula/app/.env", + "NEBULA_ROOT_HOST": self.root_path, + "NEBULA_HOST_PLATFORM": self.host_platform, + "NEBULA_DEFAULT_USER": "admin", + "NEBULA_DEFAULT_PASSWORD": "admin", + "NEBULA_FRONTEND_PORT": self.frontend_port, + "NEBULA_CONTROLLER_PORT": self.controller_port, + "NEBULA_CONTROLLER_HOST": "host.docker.internal", + } + + volumes = ["/nebula", "/var/run/docker.sock", "/etc/nginx/sites-available/default"] + + ports = [80, 8080] + + host_config = client.api.create_host_config( + binds=[ + f"{self.root_path}:/nebula", + "/var/run/docker.sock:/var/run/docker.sock", + f"{self.root_path}/nebula/frontend/config/nebula:/etc/nginx/sites-available/default", + ], + extra_hosts={"host.docker.internal": "host-gateway"}, + port_bindings={80: self.frontend_port, 8080: self.statistics_port}, ) - docker_compose_file = docker_compose_template.format(services) - if self.production: - # If WAF is enabled, we need to use the same network - docker_compose_file += network_template_external - else: - docker_compose_file += network_template.format("192.168.10.0/24", "192.168.10.1") - # Write the Docker Compose file in config directory - with open( - f"{os.path.join(os.environ['NEBULA_ROOT'], 'nebula', 'frontend', 'docker-compose.yml')}", - "w", - ) as f: - f.write(docker_compose_file) - - # Start the Docker Compose file, catch error if any - try: - subprocess.check_call([ - "docker", - "compose", - "-f", - f"{os.path.join(os.environ['NEBULA_ROOT'], 'nebula', 'frontend', 'docker-compose.yml')}", - "up", - "--build", - "-d", - ]) - except subprocess.CalledProcessError: - raise Exception( - "Docker Compose failed to start, please check if Docker Compose is installed (https://docs.docker.com/compose/install/) and Docker Engine is running." - ) + networking_config = client.api.create_networking_config({ + f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.100") + }) + + container_id = client.api.create_container( + image="nebula-frontend", + name=f"{os.environ['USER']}-nebula-frontend", + detach=True, + environment=environment, + volumes=volumes, + host_config=host_config, + networking_config=networking_config, + ports=ports, + ) - except Exception as e: - raise Exception(f"Error while starting the frontend: {e}") + client.api.start(container_id) def run_test(self): deploy_tests.start() - @staticmethod - def stop_frontend(): - if sys.platform == "win32": - try: - # kill all the docker containers which contain the word "nebula" - commands = [ - """docker kill $(docker ps -q --filter name=^nebula-frontend$) | Out-Null""", - """docker rm $(docker ps -a -q --filter name=^nebula-frontend$) | Out-Null""", - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(f'powershell.exe -Command "{command}"') - # logging.info(f"Windows Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") - else: - try: - commands = [ - """docker kill $(docker ps -q --filter name=^nebula-frontend$) > /dev/null 2>&1""", - """docker rm $(docker ps -a -q --filter name=^nebula-frontend$) > /dev/null 2>&1""", - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(command) - # logging.info(f"Linux Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") - - @staticmethod - def stop_network(): - if sys.platform == "win32": - try: - # kill all the docker containers which contain the word "nebula" - commands = [ - r"""docker network rm $(docker network ls | Where-Object { ($_ -split '\s+')[1] -eq 'nebula-net-base' } | ForEach-Object { ($_ -split '\s+')[0] }) | Out-Null""" - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(f'powershell.exe -Command "{command}"') - # logging.info(f"Windows Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") - else: - try: - commands = [ - """docker network rm $(docker network ls | grep '^nebula-net-base$' | awk '{print $1}') > /dev/null 2>&1""" - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(command) - # logging.info(f"Linux Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") - @staticmethod def stop_waf(): - if sys.platform == "win32": - try: - # kill all the docker containers which contain the word "nebula" - commands = [ - """docker compose -p waf down | Out-Null""", - """docker compose -p waf rm | Out-Null""", - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(f'powershell.exe -Command "{command}"') - # logging.info(f"Windows Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") - else: - try: - commands = [ - """docker compose -p waf down > /dev/null 2>&1""", - """docker compose -p waf rm > /dev/null 2>&1""", - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(command) - # logging.info(f"Linux Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") + DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}-nebula-waf") @staticmethod def stop(): logging.info("Closing NEBULA (exiting from components)... Please wait") - ScenarioManagement.stop_participants() + DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}") ScenarioManagement.stop_blockchain() - Controller.stop_frontend() + ScenarioManagement.stop_participants() Controller.stop_waf() - Controller.stop_network() + DockerUtils.remove_docker_networks_by_prefix(f"{os.environ['USER']}") controller_pid_file = os.path.join(os.path.dirname(__file__), "controller.pid") try: with open(controller_pid_file) as f: diff --git a/nebula/core/network/connection.py b/nebula/core/network/connection.py index 21d75519..4cbd85c3 100755 --- a/nebula/core/network/connection.py +++ b/nebula/core/network/connection.py @@ -218,7 +218,7 @@ async def send( await self._send_chunks(message_id, data_to_send) except Exception as e: logging.exception(f"Error sending data: {e}") - await self.reconnect() + # await self.reconnect() def _prepare_data(self, data: Any, pb: bool, encoding_type: str) -> tuple[bytes, bytes]: if pb: @@ -287,7 +287,7 @@ async def handle_incoming_message(self) -> None: logging.info("Message handling cancelled") except ConnectionError as e: logging.exception(f"Connection closed while reading: {e}") - await self.reconnect() + # await self.reconnect() except Exception as e: logging.exception(f"Error handling incoming message: {e}") diff --git a/nebula/core/training/lightning.py b/nebula/core/training/lightning.py index a03ff083..98bc841f 100755 --- a/nebula/core/training/lightning.py +++ b/nebula/core/training/lightning.py @@ -173,15 +173,20 @@ def create_logger(self): def create_trainer(self): # Create a new trainer and logger for each round self.create_logger() - num_gpus = torch.cuda.device_count() + num_gpus = len(self.config.participant["device_args"]["gpu_id"]) if self.config.participant["device_args"]["accelerator"] == "gpu" and num_gpus > 0: - gpu_index = self.config.participant["device_args"]["idx"] % num_gpus + # Use all available GPUs + if num_gpus > 1: + gpu_index = [self.config.participant["device_args"]["idx"] % num_gpus] + # Use the selected GPU + else: + gpu_index = self.config.participant["device_args"]["gpu_id"] logging_training.info(f"Creating trainer with accelerator GPU ({gpu_index})") self._trainer = Trainer( callbacks=[ModelSummary(max_depth=1), NebulaProgressBar()], max_epochs=self.epochs, accelerator=self.config.participant["device_args"]["accelerator"], - devices=[gpu_index], + devices=gpu_index, logger=self._logger, enable_checkpointing=False, enable_model_summary=False, diff --git a/nebula/frontend/app.py b/nebula/frontend/app.py index 5a454087..3635b66e 100755 --- a/nebula/frontend/app.py +++ b/nebula/frontend/app.py @@ -10,6 +10,7 @@ import zipfile from urllib.parse import urlencode +import aiohttp import requests from dotenv import load_dotenv @@ -18,6 +19,9 @@ class Settings: + controller_host: str = os.environ.get("NEBULA_CONTROLLER_HOST") + controller_port: int = os.environ.get("NEBULA_CONTROLLER_PORT", 5000) + resources_threshold: float = 90.0 port: int = os.environ.get("NEBULA_FRONTEND_PORT", 6060) production: bool = os.environ.get("NEBULA_PRODUCTION", "False") == "True" gpu_available: bool = os.environ.get("NEBULA_GPU_AVAILABLE", "False") == "True" @@ -32,7 +36,7 @@ class Settings: statistics_port: int = os.environ.get("NEBULA_STATISTICS_PORT", 8080) PERMANENT_SESSION_LIFETIME: datetime.timedelta = datetime.timedelta(minutes=60) templates_dir: str = "templates" - server_log: str = os.environ.get("NEBULA_SERVER_LOG", "/nebula/app/logs/server.log") + frontend_log: str = os.environ.get("NEBULA_FRONTEND_LOG", "/nebula/app/logs/frontend.log") settings = Settings() @@ -42,7 +46,7 @@ class Settings: format="[%(asctime)s] [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(), - logging.FileHandler(settings.server_log, mode="w"), + logging.FileHandler(settings.frontend_log, mode="w"), ], ) @@ -50,7 +54,7 @@ class Settings: for logger_name in uvicorn_loggers: logger = logging.getLogger(logger_name) logger.propagate = False # Prevent duplicate logs - handler = logging.FileHandler(settings.server_log, mode="a") + handler = logging.FileHandler(settings.frontend_log, mode="a") handler.setFormatter(logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s")) logger.addHandler(handler) @@ -93,6 +97,7 @@ class Settings: get_notes, get_running_scenario, get_scenario_by_name, + get_user_by_scenario_name, get_user_info, initialize_databases, list_nodes_by_scenario_name, @@ -109,7 +114,7 @@ class Settings: verify, verify_hash_algorithm, ) -from nebula.frontend.utils import Utils +from nebula.utils import DockerUtils, FileUtils logging.info(f"πŸš€ Starting Nebula Frontend on port {settings.port}") @@ -125,7 +130,11 @@ class Settings: logging.info("SECRET_KEY already set") app = FastAPI() -app.add_middleware(SessionMiddleware, secret_key=os.environ.get("SECRET_KEY")) +app.add_middleware( + SessionMiddleware, + secret_key=os.environ.get("SECRET_KEY"), + session_cookie=f"session_{os.environ.get('NEBULA_FRONTEND_PORT')}", +) app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -217,13 +226,18 @@ async def startup_event(): set_default_user() -nodes_registration = {} - -scenarios_list = [] +class UserData: + def __init__(self): + self.nodes_registration = {} + self.scenarios_list = [] + self.scenarios_list_length = 0 + self.scenarios_finished = 0 + self.nodes_finished = [] + self.stop_all_scenarios_event = asyncio.Event() + self.finish_scenario_event = asyncio.Event() -scenarios_list_length = 0 -scenarios_finished = 0 +user_data_store = {} # Detect CTRL+C from parent process @@ -382,10 +396,10 @@ async def nebula_add_user( if session.get("role") == "admin": # only Admin should be able to add user. user_list = list_users(all_info=True) if user.upper() in user_list or " " in user or "'" in user or '"' in user: - return RedirectResponse(url="/nebula/admin") + return RedirectResponse(url="/nebula/admin", status_code=status.HTTP_303_SEE_OTHER) else: add_user(user, password, role) - return RedirectResponse(url="/nebula/admin") + return RedirectResponse(url="/nebula/admin", status_code=status.HTTP_303_SEE_OTHER) else: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) @@ -398,15 +412,10 @@ async def nebula_update_user( password: str = Form(...), role: str = Form(...), ): - if session.get("role") == "admin": - user_list = list_users() - if user not in user_list: - return RedirectResponse(url="/nebula/admin") - else: - update_user(user, password, role) - return RedirectResponse(url="/nebula/admin") - else: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + if "user" not in session or session["role"] != "admin": + return RedirectResponse(url="/nebula", status_code=status.HTTP_302_FOUND) + update_user(user, password, role) + return RedirectResponse(url="/nebula/admin", status_code=status.HTTP_302_FOUND) @app.get("/nebula/api/dashboard/runningscenario", response_class=JSONResponse) @@ -420,20 +429,103 @@ async def nebula_dashboard_runningscenario(): return JSONResponse({"scenario_status": "not running"}) +async def get_host_resources(): + url = f"http://{settings.controller_host}:{settings.controller_port}/resources" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + try: + return await response.json() + except Exception as e: + return {"error": f"Failed to parse JSON: {e}"} + else: + return None + + +async def get_available_gpus(): + url = f"http://{settings.controller_host}:{settings.controller_port}/available_gpus" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + try: + return await response.json() + except Exception as e: + return {"error": f"Failed to parse JSON: {e}"} + else: + return None + + +async def get_least_memory_gpu(): + url = f"http://{settings.controller_host}:{settings.controller_port}/least_memory_gpu" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + try: + return await response.json() + except Exception as e: + return {"error": f"Failed to parse JSON: {e}"} + else: + return None + + +async def check_enough_resources(): + resources = await get_host_resources() + + mem_percent = resources.get("memory_percent") + gpu_memory_percent = resources.get("gpu_memory_percent", []) + + # if cpu_percent >= settings.resources_threshold or mem_percent >= settings.resources_threshold: + if mem_percent >= settings.resources_threshold: + return False + elif len(gpu_memory_percent) > 0: + for gpu_mem in gpu_memory_percent: + if gpu_mem >= settings.resources_threshold: + return False + + return True + + +async def monitor_resources(user): + user_data = user_data_store[user] + + while user_data.scenarios_list_length > 0: + enough_resources = await check_enough_resources() + if not enough_resources: + running_scenario = get_running_scenario(user) + if running_scenario: + # Wich card has big memory consumption + gpu = await get_least_memory_gpu() + # Stop scenario if is using the high memory gpu + running_scenario_as_dict = dict(running_scenario) + if running_scenario_as_dict["gpu_id"] == gpu.get("available_gpu_index"): + scenario_name = running_scenario_as_dict["name"] + stop_scenario(scenario_name, user) + user_data.scenarios_list_length -= 1 + user_data.finish_scenario_event.set() + + await asyncio.sleep(5) + + + @app.get("/nebula/api/dashboard", response_class=JSONResponse) @app.get("/nebula/dashboard", response_class=HTMLResponse) async def nebula_dashboard(request: Request, session: dict = Depends(get_session)): if "user" in session: - scenarios = get_all_scenarios_and_check_completed() # Get all scenarios after checking if they are completed + scenarios = get_all_scenarios_and_check_completed( + username=session["user"], role=session["role"] + ) # Get all scenarios after checking if they are completed scenario_running = get_running_scenario() + if session["user"] not in user_data_store: + user_data_store[session["user"]] = UserData() + + user_data = user_data_store[session["user"]] else: scenarios = None scenario_running = None bool_completed = False if scenario_running: - bool_completed = scenario_running[5] == "completed" - + bool_completed = scenario_running[6] == "completed" if scenarios: if request.url.path == "/nebula/dashboard": return templates.TemplateResponse( @@ -441,11 +533,13 @@ async def nebula_dashboard(request: Request, session: dict = Depends(get_session { "request": request, "scenarios": scenarios, - "scenarios_list_length": scenarios_list_length, - "scenarios_finished": scenarios_finished, + "scenarios_list_length": user_data.scenarios_list_length, + "scenarios_finished": user_data.scenarios_finished, "scenario_running": scenario_running, "scenario_completed": bool_completed, "user_logged_in": session.get("user"), + "user_role": session.get("role"), + "user_data_store": user_data_store, }, ) elif request.url.path == "/nebula/api/dashboard": @@ -503,7 +597,7 @@ async def nebula_dashboard_monitor(scenario_name: str, request: Request, session strict=False, # Status ) - topology_path = Utils.check_path(settings.config_dir, os.path.join(scenario_name, "topology.png")) + topology_path = FileUtils.check_path(settings.config_dir, os.path.join(scenario_name, "topology.png")) if os.path.exists(topology_path): latest_participant_file_mtime = max([ os.path.getmtime( @@ -604,7 +698,7 @@ def update_topology(scenario_name, nodes_list, nodes_config): @app.post("/nebula/dashboard/{scenario_name}/node/update") -async def nebula_update_node(scenario_name: str, request: Request, session: dict = Depends(get_session)): +async def nebula_update_node(scenario_name: str, request: Request): if request.method == "POST": if request.headers.get("content-type") == "application/json": config = await request.json() @@ -659,16 +753,21 @@ async def nebula_update_node(scenario_name: str, request: Request, session: dict @app.post("/nebula/dashboard/{scenario_name}/node/register") -async def nebula_register_node(scenario_name: str, request: Request): +async def nebula_register_node(scenario_name: str, request: Request, session: dict = Depends(get_session)): + user_data = user_data_store[session["user"]] + if request.headers.get("content-type") == "application/json": data = await request.json() node = data["node"] logging.info(f"Registering node {node} for scenario {scenario_name}") - async with nodes_registration[scenario_name]["condition"]: - nodes_registration[scenario_name]["nodes"].add(node) + async with user_data.nodes_registration[scenario_name]["condition"]: + user_data.nodes_registration[scenario_name]["nodes"].add(node) logging.info(f"Node {node} registered") - if len(nodes_registration[scenario_name]["nodes"]) == nodes_registration[scenario_name]["n_nodes"]: - nodes_registration[scenario_name]["condition"].notify_all() + if ( + len(user_data.nodes_registration[scenario_name]["nodes"]) + == user_data.nodes_registration[scenario_name]["n_nodes"] + ): + user_data.nodes_registration[scenario_name]["condition"].notify_all() logging.info("All nodes registered") return JSONResponse({"message": "Node registered", "status": "success"}, status_code=200) @@ -678,11 +777,13 @@ async def nebula_register_node(scenario_name: str, request: Request): @app.get("/nebula/dashboard/scenarios/node/list") async def nebula_list_all_scenarios(session: dict = Depends(get_session)): + user_data = user_data_store[session["user"]] + if "user" not in session or session["role"] not in ["admin", "user"]: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") scenarios = {} - for scenario_name, scenario_info in nodes_registration.items(): + for scenario_name, scenario_info in user_data.nodes_registration.items(): scenarios[scenario_name] = list(scenario_info["nodes"]) if not scenarios: @@ -693,27 +794,34 @@ async def nebula_list_all_scenarios(session: dict = Depends(get_session)): @app.get("/nebula/dashboard/scenarios/node/erase") async def nebula_erase_all_nodes(session: dict = Depends(get_session)): + user_data = user_data_store[session["user"]] + if "user" not in session or session["role"] not in ["admin", "user"]: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") - nodes_registration.clear() + user_data.nodes_registration.clear() return JSONResponse({"message": "All nodes erased", "status": "success"}, status_code=200) @app.get("/nebula/dashboard/{scenario_name}/node/wait") -async def nebula_wait_nodes(scenario_name: str): - if scenario_name not in nodes_registration: +async def nebula_wait_nodes(scenario_name: str, session: dict = Depends(get_session)): + user_data = user_data_store[session["user"]] + + if scenario_name not in user_data.nodes_registration: return JSONResponse({"message": "Scenario not found", "status": "error"}, status_code=404) - async with nodes_registration[scenario_name]["condition"]: - while len(nodes_registration[scenario_name]["nodes"]) < nodes_registration[scenario_name]["n_nodes"]: - await nodes_registration[scenario_name]["condition"].wait() + async with user_data.nodes_registration[scenario_name]["condition"]: + while ( + len(user_data.nodes_registration[scenario_name]["nodes"]) + < user_data.nodes_registration[scenario_name]["n_nodes"] + ): + await user_data.nodes_registration[scenario_name]["condition"].wait() return JSONResponse({"message": "All nodes registered", "status": "success"}, status_code=200) @app.get("/nebula/dashboard/{scenario_name}/node/{id}/infolog") async def nebula_monitor_log(scenario_name: str, id: str): - logs = Utils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}.log")) + logs = FileUtils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}.log")) if os.path.exists(logs): return FileResponse(logs, media_type="text/plain", filename=f"participant_{id}.log") else: @@ -725,7 +833,7 @@ async def nebula_monitor_log(scenario_name: str, id: str): response_class=PlainTextResponse, ) async def nebula_monitor_log_x(scenario_name: str, id: str, number: int): - logs = Utils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}.log")) + logs = FileUtils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}.log")) if os.path.exists(logs): with open(logs) as f: lines = f.readlines()[-number:] @@ -739,7 +847,7 @@ async def nebula_monitor_log_x(scenario_name: str, id: str, number: int): @app.get("/nebula/dashboard/{scenario_name}/node/{id}/debuglog") async def nebula_monitor_log_debug(scenario_name: str, id: str): - logs = Utils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}_debug.log")) + logs = FileUtils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}_debug.log")) if os.path.exists(logs): return FileResponse(logs, media_type="text/plain", filename=f"participant_{id}_debug.log") else: @@ -748,7 +856,7 @@ async def nebula_monitor_log_debug(scenario_name: str, id: str): @app.get("/nebula/dashboard/{scenario_name}/node/{id}/errorlog") async def nebula_monitor_log_error(scenario_name: str, id: str): - logs = Utils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}_error.log")) + logs = FileUtils.check_path(settings.log_dir, os.path.join(scenario_name, f"participant_{id}_error.log")) if os.path.exists(logs): return FileResponse(logs, media_type="text/plain", filename=f"participant_{id}_error.log") else: @@ -757,21 +865,25 @@ async def nebula_monitor_log_error(scenario_name: str, id: str): @app.get("/nebula/dashboard/{scenario_name}/topology/image/") async def nebula_monitor_image(scenario_name: str): - topology_image = Utils.check_path(settings.log_dir, os.path.join(scenario_name, "topology.png")) + topology_image = FileUtils.check_path(settings.log_dir, os.path.join(scenario_name, "topology.png")) if os.path.exists(topology_image): return FileResponse(topology_image, media_type="image/png") else: raise HTTPException(status_code=404, detail="Topology image not found") -def stop_scenario(scenario_name): +def stop_scenario(scenario_name, user): from nebula.scenarios import ScenarioManagement - ScenarioManagement.stop_participants() + ScenarioManagement.stop_participants(scenario_name) + DockerUtils.remove_containers_by_prefix(f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-{user}-participant") + DockerUtils.remove_docker_network( + f"{(os.environ.get('NEBULA_CONTROLLER_NAME'))}-{str(user).lower()}-nebula-net-scenario" + ) ScenarioManagement.stop_blockchain() scenario_set_status_to_finished(scenario_name) # Generate statistics for the scenario - path = Utils.check_path(settings.log_dir, scenario_name) + path = FileUtils.check_path(settings.log_dir, scenario_name) ScenarioManagement.generate_statistics(path) @@ -791,33 +903,37 @@ async def nebula_stop_scenario( session: dict = Depends(get_session), ): if "user" in session: + user = get_user_by_scenario_name(scenario_name) + user_data = user_data_store[user] + if session["role"] == "demo": raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) - elif session["role"] == "user": - if not check_scenario_with_role(session["role"], scenario_name): - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + # elif session["role"] == "user": + # if not check_scenario_with_role(session["role"], scenario_name): + # raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) if stop_all: - stop_all_scenarios_event.set() - global scenarios_list_length - global scenarios_finished - scenarios_list_length = 0 - scenarios_finished = 0 - stop_scenario(scenario_name) + user_data.stop_all_scenarios_event.set() + user_data.scenarios_list_length = 0 + user_data.scenarios_finished = 0 + stop_scenario(scenario_name, user) else: - finish_scenario_event.set() - stop_scenario(scenario_name) + user_data.finish_scenario_event.set() + user_data.scenarios_list_length -= 1 + stop_scenario(scenario_name, user) return RedirectResponse(url="/nebula/dashboard") else: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) -def remove_scenario(scenario_name=None): +def remove_scenario(scenario_name=None, user=None): from nebula.scenarios import ScenarioManagement + user_data = user_data_store[user] + if settings.advanced_analytics: logging.info("Advanced analytics enabled") # Remove registered nodes and conditions - nodes_registration.pop(scenario_name, None) + user_data.nodes_registration.pop(scenario_name, None) remove_nodes_by_scenario_name(scenario_name) remove_scenario_by_name(scenario_name) remove_note(scenario_name) @@ -828,7 +944,7 @@ def remove_scenario(scenario_name=None): async def nebula_relaunch_scenario( scenario_name: str, background_tasks: BackgroundTasks, session: dict = Depends(get_session) ): - global scenarios_list, scenarios_list_length, scenarios_finished + user_data = user_data_store[session["user"]] if "user" in session: if session["role"] == "demo": @@ -837,19 +953,19 @@ async def nebula_relaunch_scenario( if not check_scenario_with_role(session["role"], scenario_name): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) - scenario_path = Utils.check_path(settings.config_dir, os.path.join(scenario_name, "scenario.json")) + scenario_path = FileUtils.check_path(settings.config_dir, os.path.join(scenario_name, "scenario.json")) with open(scenario_path) as scenario_file: scenario = json.load(scenario_file) - scenarios_list_length = scenarios_list_length + 1 + user_data.scenarios_list_length = user_data.scenarios_list_length + 1 - if scenarios_list_length == 1: - scenarios_finished = 0 - scenarios_list.clear() - scenarios_list.append(scenario) - background_tasks.add_task(run_scenarios, session["role"]) + if user_data.scenarios_list_length == 1: + user_data.scenarios_finished = 0 + user_data.scenarios_list.clear() + user_data.scenarios_list.append(scenario) + background_tasks.add_task(run_scenarios, session["role"], session["user"]) else: - scenarios_list.append(scenario) + user_data.scenarios_list.append(scenario) return RedirectResponse(url="/nebula/dashboard", status_code=303) else: @@ -857,14 +973,14 @@ async def nebula_relaunch_scenario( @app.get("/nebula/dashboard/{scenario_name}/remove") -async def nebula_remove_scenario(scenario_name: str, request: Request, session: dict = Depends(get_session)): +async def nebula_remove_scenario(scenario_name: str, session: dict = Depends(get_session)): if "user" in session: if session["role"] == "demo": raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) elif session["role"] == "user": if not check_scenario_with_role(session["role"], scenario_name): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) - remove_scenario(scenario_name) + remove_scenario(scenario_name, session["user"]) return RedirectResponse(url="/nebula/dashboard") else: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) @@ -967,8 +1083,8 @@ async def nebula_dashboard_download_logs_metrics( scenario_name: str, request: Request, session: dict = Depends(get_session) ): if "user" in session: - log_folder = Utils.check_path(settings.log_dir, scenario_name) - config_folder = Utils.check_path(settings.config_dir, scenario_name) + log_folder = FileUtils.check_path(settings.log_dir, scenario_name) + config_folder = FileUtils.check_path(settings.config_dir, scenario_name) if os.path.exists(log_folder) and os.path.exists(config_folder): # Crear un archivo zip con los logs y los archivos de configuraciΓ³n, enviarlo al usuario memory_file = io.BytesIO() @@ -1078,32 +1194,26 @@ def mobility_assign(nodes, mobile_participants_percent): return nodes -# Stop all scenarios in the scenarios_list -stop_all_scenarios_event = asyncio.Event() - -# Finish actual scenario -finish_scenario_event = asyncio.Event() - -# Nodes that completed the experiment -nodes_finished = [] - - # Recieve a stopped node @app.post("/nebula/dashboard/{scenario_name}/node/done") async def node_stopped(scenario_name: str, request: Request): + user = get_user_by_scenario_name(scenario_name) + user_data = user_data_store[user] + if request.headers.get("content-type") == "application/json": data = await request.json() - nodes_finished.append(data["idx"]) + user_data.nodes_finished.append(data["idx"]) nodes_list = list_nodes_by_scenario_name(scenario_name) finished = True # Check if all the nodes of the scenario have finished the experiment for node in nodes_list: - if str(node[1]) not in map(str, nodes_finished): + if str(node[1]) not in map(str, user_data.nodes_finished): finished = False if finished: - nodes_finished.clear() - finish_scenario_event.set() + stop_scenario(scenario_name, user) + user_data.nodes_finished.clear() + user_data.finish_scenario_event.set() return JSONResponse( status_code=200, content={"message": "All nodes finished, scenario marked as completed."}, @@ -1117,16 +1227,37 @@ async def node_stopped(scenario_name: str, request: Request): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) -async def run_scenario(scenario_data, role): +async def assign_available_gpu(scenario_data, role): + if scenario_data["accelerator"] == "cpu": + scenario_data["gpu_id"] = [] + else: + response = await get_available_gpus() + available_gpus = response.get("available_gpus") + if len(available_gpus) > 0: + if role == "user": + scenario_data["gpu_id"] = [available_gpus.pop()] + elif role == "admin": + scenario_data["gpu_id"] = available_gpus + else: + scenario_data["gpu_id"] = [] + + return scenario_data + + +async def run_scenario(scenario_data, role, user): import subprocess from nebula.scenarios import ScenarioManagement + user_data = user_data_store[user] + + scenario_data = await assign_available_gpu(scenario_data, role) # Manager for the actual scenario - scenarioManagement = ScenarioManagement(scenario_data) + scenarioManagement = ScenarioManagement(scenario_data, user) scenario_update_record( scenario_name=scenarioManagement.scenario_name, + username=user, start_time=scenarioManagement.start_date_scenario, end_time="", status="running", @@ -1137,6 +1268,7 @@ async def run_scenario(scenario_data, role): dataset=scenario_data["dataset"], rounds=scenario_data["rounds"], role=role, + gpu_id=json.dumps(scenario_data["gpu_id"]) ) # Run the actual scenario @@ -1153,37 +1285,39 @@ async def run_scenario(scenario_data, role): logging.exception(f"Error docker-compose up: {e}") return - nodes_registration[scenarioManagement.scenario_name] = { + user_data.nodes_registration[scenarioManagement.scenario_name] = { "n_nodes": scenario_data["n_nodes"], "nodes": set(), } - nodes_registration[scenarioManagement.scenario_name]["condition"] = asyncio.Condition() - - return scenarioManagement.scenario_name + user_data.nodes_registration[scenarioManagement.scenario_name]["condition"] = asyncio.Condition() # Deploy the list of scenarios -async def run_scenarios(role): +async def run_scenarios(role, user): try: - global scenarios_finished, scenarios_list_length - for scenario_data in scenarios_list: - finish_scenario_event.clear() + user_data = user_data_store[user] + + for scenario_data in user_data.scenarios_list: + user_data.finish_scenario_event.clear() logging.info(f"Running scenario {scenario_data['scenario_title']}") - scenario_name = await run_scenario(scenario_data, role) + await run_scenario(scenario_data, role, user) # Waits till the scenario is completed - while not finish_scenario_event.is_set() and not stop_all_scenarios_event.is_set(): + while not user_data.finish_scenario_event.is_set() and not user_data.stop_all_scenarios_event.is_set(): + await asyncio.sleep(1) + + # Wait until theres enough resources to launch the next scenario + while not await check_enough_resources(): await asyncio.sleep(1) - if stop_all_scenarios_event.is_set(): - stop_all_scenarios_event.clear() - scenarios_list_length = 0 - stop_scenario(scenario_name) + + if user_data.stop_all_scenarios_event.is_set(): + user_data.stop_all_scenarios_event.clear() + user_data.scenarios_list_length = 0 return - scenarios_finished += 1 - stop_scenario(scenario_name) + user_data.scenarios_finished += 1 await asyncio.sleep(5) finally: - scenarios_list_length = 0 + user_data.scenarios_list_length = 0 @app.post("/nebula/dashboard/deployment/run") @@ -1192,24 +1326,31 @@ async def nebula_dashboard_deployment_run( background_tasks: BackgroundTasks, session: dict = Depends(get_session), ): - if "user" not in session or session["role"] in ["demo", "user"] and get_running_scenario(): + enough_resources = await check_enough_resources() + + if not enough_resources or "user" not in session or session["role"] in ["demo"] and get_running_scenario(): raise HTTPException(status_code=401) if request.headers.get("content-type") != "application/json": raise HTTPException(status_code=401) data = await request.json() - global scenarios_finished, scenarios_list_length, scenarios_list - if scenarios_list_length < 1: - scenarios_finished = 0 - scenarios_list_length = len(data) - scenarios_list = data - background_tasks.add_task(run_scenarios, session["role"]) + user_data = user_data_store[session["user"]] + + if user_data.scenarios_list_length < 1: + user_data.scenarios_finished = 0 + user_data.scenarios_list_length = len(data) + user_data.scenarios_list = data + background_tasks.add_task(run_scenarios, session["role"], session["user"]) + try: + asyncio.create_task(monitor_resources(session["user"])) + except Exception as e: + logging.exception(f"Error creating monitoring background_task {e}") else: - scenarios_list_length += len(data) - scenarios_list.extend(data) + user_data.scenarios_list_length += len(data) + user_data.scenarios_list.extend(data) await asyncio.sleep(3) - logging.info(f"Running deployment with {len(data)} scenarios") + logging.info(f"Running deployment with {len(data)} scenarios_list_length: {user_data.scenarios_list_length} scenarios") return RedirectResponse(url="/nebula/dashboard", status_code=303) # return Response(content="Success", status_code=200) diff --git a/nebula/frontend/config/participant.json.example b/nebula/frontend/config/participant.json.example index c6826612..db761866 100755 --- a/nebula/frontend/config/participant.json.example +++ b/nebula/frontend/config/participant.json.example @@ -22,6 +22,7 @@ "malicious": false, "start": false, "accelerator": "cpu", + "gpu_id" : null, "devices": 2, "strategy": "ddp", "logging": false diff --git a/nebula/frontend/database.py b/nebula/frontend/database.py index 0a052108..dbeff58e 100755 --- a/nebula/frontend/database.py +++ b/nebula/frontend/database.py @@ -29,6 +29,15 @@ async def setup_database(db_file_location): await db.commit() +async def ensure_columns(conn, table_name, desired_columns): + _c = await conn.execute(f"PRAGMA table_info({table_name});") + existing_columns = [row[1] for row in await _c.fetchall()] + for column_name, column_definition in desired_columns.items(): + if column_name not in existing_columns: + await conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_definition};") + await conn.commit() + + async def initialize_databases(): await setup_database(user_db_file_location) await setup_database(node_db_file_location) @@ -36,73 +45,103 @@ async def initialize_databases(): await setup_database(notes_db_file_location) async with aiosqlite.connect(user_db_file_location) as conn: - _c = await conn.cursor() - await _c.execute( + await conn.execute( """ CREATE TABLE IF NOT EXISTS users ( user TEXT PRIMARY KEY, - password TEXT NOT NULL, - role TEXT NOT NULL + password TEXT, + role TEXT ); """ ) - await conn.commit() + desired_columns = {"user": "TEXT PRIMARY KEY", "password": "TEXT", "role": "TEXT"} + await ensure_columns(conn, "users", desired_columns) async with aiosqlite.connect(node_db_file_location) as conn: - _c = await conn.cursor() - await _c.execute( + await conn.execute( """ CREATE TABLE IF NOT EXISTS nodes ( uid TEXT PRIMARY KEY, - idx TEXT NOT NULL, - ip TEXT NOT NULL, - port TEXT NOT NULL, - role TEXT NOT NULL, - neighbors TEXT NOT NULL, - latitude TEXT NOT NULL, - longitude TEXT NOT NULL, - timestamp TEXT NOT NULL, - federation TEXT NOT NULL, - round TEXT NOT NULL, - scenario TEXT NOT NULL, - hash TEXT NOT NULL + idx TEXT, + ip TEXT, + port TEXT, + role TEXT, + neighbors TEXT, + latitude TEXT, + longitude TEXT, + timestamp TEXT, + federation TEXT, + round TEXT, + scenario TEXT, + hash TEXT ); """ ) - await conn.commit() + desired_columns = { + "uid": "TEXT PRIMARY KEY", + "idx": "TEXT", + "ip": "TEXT", + "port": "TEXT", + "role": "TEXT", + "neighbors": "TEXT", + "latitude": "TEXT", + "longitude": "TEXT", + "timestamp": "TEXT", + "federation": "TEXT", + "round": "TEXT", + "scenario": "TEXT", + "hash": "TEXT", + } + await ensure_columns(conn, "nodes", desired_columns) async with aiosqlite.connect(scenario_db_file_location) as conn: - _c = await conn.cursor() - await _c.execute( + await conn.execute( """ CREATE TABLE IF NOT EXISTS scenarios ( name TEXT PRIMARY KEY, - start_time TEXT NOT NULL, - end_time TEXT NOT NULL, - title TEXT NOT NULL, - description TEXT NOT NULL, - status TEXT NOT NULL, - network_subnet TEXT NOT NULL, - model TEXT NOT NULL, - dataset TEXT NOT NULL, - rounds TEXT NOT NULL, - role TEXT NOT NULL + start_time TEXT, + end_time TEXT, + title TEXT, + description TEXT, + status TEXT, + network_subnet TEXT, + model TEXT, + dataset TEXT, + rounds TEXT, + role TEXT, + username TEXT, + gpu_id TEXT ); """ ) - await conn.commit() + desired_columns = { + "name": "TEXT PRIMARY KEY", + "start_time": "TEXT", + "end_time": "TEXT", + "title": "TEXT", + "description": "TEXT", + "status": "TEXT", + "network_subnet": "TEXT", + "model": "TEXT", + "dataset": "TEXT", + "rounds": "TEXT", + "role": "TEXT", + "username": "TEXT", + "gpu_id" : "TEXT", + } + await ensure_columns(conn, "scenarios", desired_columns) async with aiosqlite.connect(notes_db_file_location) as conn: - _c = await conn.cursor() - await _c.execute( + await conn.execute( """ CREATE TABLE IF NOT EXISTS notes ( scenario TEXT PRIMARY KEY, - scenario_notes TEXT NOT NULL + scenario_notes TEXT ); """ ) - await conn.commit() + desired_columns = {"scenario": "TEXT PRIMARY KEY", "scenario_notes": "TEXT"} + await ensure_columns(conn, "notes", desired_columns) def list_users(all_info=False): @@ -321,51 +360,91 @@ def get_run_hashes_scenario(scenario_name): return result_hashes -def get_all_scenarios(sort_by="start_time"): +def get_all_scenarios(username, role, sort_by="start_time"): with sqlite3.connect(scenario_db_file_location) as conn: conn.row_factory = sqlite3.Row c = conn.cursor() - if sort_by == "start_time": - command = """ - SELECT * FROM scenarios - ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)); - """ - c.execute(command) + if role == "admin": + if sort_by == "start_time": + command = """ + SELECT * FROM scenarios + ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)); + """ + c.execute(command) + else: + command = "SELECT * FROM scenarios ORDER BY ?;" + c.execute(command, (sort_by,)) else: - command = "SELECT * FROM scenarios ORDER BY ?;" - c.execute(command, (sort_by,)) + if sort_by == "start_time": + command = """ + SELECT * FROM scenarios + WHERE username = ? + ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)); + """ + c.execute(command, (username,)) + else: + command = "SELECT * FROM scenarios WHERE username = ? ORDER BY ?;" + c.execute( + command, + ( + username, + sort_by, + ), + ) result = c.fetchall() return result -def get_all_scenarios_and_check_completed(sort_by="start_time"): +def get_all_scenarios_and_check_completed(username, role, sort_by="start_time"): with sqlite3.connect(scenario_db_file_location) as _conn: _conn.row_factory = sqlite3.Row _c = _conn.cursor() - if sort_by == "start_time": - command = """ - SELECT * FROM scenarios - ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)); - """ - _c.execute(command) + + if role == "admin": + if sort_by == "start_time": + command = """ + SELECT * FROM scenarios + ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)); + """ + _c.execute(command) + else: + command = "SELECT * FROM scenarios ORDER BY ?;" + _c.execute(command, (sort_by,)) + # _c.execute(command) + result = _c.fetchall() else: - command = "SELECT * FROM scenarios ORDER BY ?;" - _c.execute(command, (sort_by,)) - _c.execute(command) - result = _c.fetchall() + if sort_by == "start_time": + command = """ + SELECT * FROM scenarios + WHERE username = ? + ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)); + """ + _c.execute(command, (username,)) + else: + command = "SELECT * FROM scenarios WHERE username = ? ORDER BY ?;" + _c.execute( + command, + ( + username, + sort_by, + ), + ) + # _c.execute(command) + result = _c.fetchall() for scenario in result: if scenario["status"] == "running": if check_scenario_federation_completed(scenario["name"]): scenario_set_status_to_completed(scenario["name"]) - result = get_all_scenarios() + result = get_all_scenarios(username, role) return result def scenario_update_record( scenario_name, + username, start_time, end_time, title, @@ -376,6 +455,7 @@ def scenario_update_record( dataset, rounds, role, + gpu_id ): _conn = sqlite3.connect(scenario_db_file_location) _c = _conn.cursor() @@ -387,7 +467,7 @@ def scenario_update_record( if result is None: # Create a new record _c.execute( - "INSERT INTO scenarios VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO scenarios VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( scenario_name, start_time, @@ -400,6 +480,8 @@ def scenario_update_record( dataset, rounds, role, + username, + gpu_id, ), ) else: @@ -463,12 +545,21 @@ def scenario_set_status_to_completed(scenario_name): print(f"Database error: {e}") -def get_running_scenario(): +def get_running_scenario(username=None): with sqlite3.connect(scenario_db_file_location) as conn: conn.row_factory = sqlite3.Row c = conn.cursor() - command = "SELECT * FROM scenarios WHERE status = ? OR status = ?;" - c.execute(command, ("running", "completed")) + + if username: + command = """ + SELECT * FROM scenarios + WHERE (status = ? OR status = ?) AND username = ?; + """ + c.execute(command, ("running", "completed", username)) + else: + command = "SELECT * FROM scenarios WHERE status = ? OR status = ?;" + c.execute(command, ("running", "completed")) + result = c.fetchone() return result @@ -498,6 +589,19 @@ def get_scenario_by_name(scenario_name): return result +def get_user_by_scenario_name(scenario_name): + _conn = sqlite3.connect(scenario_db_file_location) + _c = _conn.cursor() + command = "SELECT username FROM scenarios WHERE name = ?;" + _c.execute(command, (scenario_name,)) + result = _c.fetchone() + + _conn.commit() + _conn.close() + + return result[0] if result else None + + def remove_scenario_by_name(scenario_name): _conn = sqlite3.connect(scenario_db_file_location) _c = _conn.cursor() diff --git a/nebula/frontend/templates/admin.html b/nebula/frontend/templates/admin.html index 978804c8..5a682899 100755 --- a/nebula/frontend/templates/admin.html +++ b/nebula/frontend/templates/admin.html @@ -9,30 +9,18 @@

Admin Dashboard

-{# Display error message if any #} -{% if id_to_add_is_duplicated or id_to_add_is_invalid or id_to_change_password_not_exists %} -
- - Warning! - {% if id_to_add_is_duplicated %} - The account name already exists. - {% elif id_to_add_is_invalid %} - The account name is invalid. - {% endif %} -
-{% endif %} - {% endif %} - {% if scenarios_list_length > 1 %} +
+ + + + + + + + + + {% for user, user_data in user_data_store.items() %} + + + + + + + + {% endfor %} +
Userscenarios_finished/scenarios_list_length
{{ user}}{{ user_data.scenarios_finished }}{{ "/" }}{{ user_data.scenarios_list_length }}
+
+ {% if scenario_running %}

Scenarios queue {{ scenarios_finished }}/{{ scenarios_list_length }}

{% if scenarios_finished != scenarios_list_length %} - Stop scenario queue {% endif %} @@ -91,17 +115,6 @@

There are no deployed scenarios

Compare scenarios {% endif %}
- {% if scenarios_list_length is defined and scenarios_list_length > 1 %} -
-

Scenarios queue {{ scenarios_finished - }}/{{ scenarios_list_length }}

- {% if scenarios_finished != scenarios_list_length %} - Stop - scenario queue - {% endif %} -
- {% endif %} {% endif %} @@ -116,6 +129,9 @@

Scenarios in the database

+ {% if user_role == "admin" %} + + {% endif %} @@ -126,8 +142,11 @@

Scenarios in the database

{% for name, start_time, end_time, title, description, status, network_subnet, model, dataset, - rounds, role in scenarios %} + rounds, role, username, gpu_id in scenarios %} + {% if user_role == "admin" %} + + {% endif %} @@ -155,7 +174,7 @@

Scenarios in the database

{% elif status == "completed" %} Stop scenario - Stop scenario queue {% else %} Schema of deployment var scenariosList = []; var actual_scenario = 0; + //Replace a scenario by index + function setScenario(index, newScenario) { + if (index >= 0 && index < scenariosList.length) { + scenariosList[index] = newScenario; + } + } + //Save current scenario function saveScenario(){ //Save scenario data @@ -2103,10 +2110,15 @@
Schema of deployment
document.getElementById("yes-button").addEventListener("click", function () { if(scenarioStorage.scenariosList < 1) { - document.getElementById("scenario-title").value = "scenario" + document.getElementById("scenario-title").value = "empty" } scenarioStorage.replaceScenario(); + scenarioStorage.scenariosList.forEach((scenario, index) => { + if (!scenario.scenario_title){ + scenarioStorage.scenariosList[index].scenario_title = "empty"; + } + }); var data = scenarioStorage.scenariosList; // Hide the modal @@ -2129,7 +2141,7 @@
Schema of deployment
document.getElementById("spinner").style.display = "none"; // If the user is a demo, show a modal with a message $('#confirm-modal').on('hidden.bs.modal', function () { - $('#info-modal-body').html('You are not allowed to run a scenario. You have a limited functionality or a scenario is already running'); + $('#info-modal-body').html('You are not authorized to run a scenario. Your access is limited, or you may need to wait until more scenarios can be launched'); $('#info-modal').modal('show'); }); } diff --git a/nebula/frontend/utils.py b/nebula/frontend/utils.py deleted file mode 100644 index 4d6b1c06..00000000 --- a/nebula/frontend/utils.py +++ /dev/null @@ -1,15 +0,0 @@ -import os - - -class Utils: - def __init__(self): - self.init() - - @classmethod - def check_path(cls, base_path, relative_path): - full_path = os.path.normpath(os.path.join(base_path, relative_path)) - base_path = os.path.normpath(base_path) - - if not full_path.startswith(base_path): - raise Exception("Not allowed") - return full_path diff --git a/nebula/scenarios.py b/nebula/scenarios.py index b3829fbd..ea9dffc1 100644 --- a/nebula/scenarios.py +++ b/nebula/scenarios.py @@ -7,7 +7,6 @@ import shutil import subprocess import sys -import textwrap import time from datetime import datetime @@ -18,7 +17,7 @@ from nebula.addons.topologymanager import TopologyManager from nebula.config.config import Config from nebula.core.utils.certificate import generate_ca_certificate, generate_certificate -from nebula.frontend.utils import Utils +from nebula.utils import DockerUtils, FileUtils # Definition of a scenario @@ -44,6 +43,7 @@ def __init__( logginglevel, report_status_data_queue, accelerator, + gpu_id, network_subnet, network_gateway, epochs, @@ -90,6 +90,7 @@ def __init__( logginglevel (str): Logging level. report_status_data_queue (bool): Indicator to report information about the nodes of the scenario accelerator (str): Accelerator used. + gpu_id (list) : Id list of the used gpu network_subnet (str): Network subnet. network_gateway (str): Network gateway. epochs (int): Number of epochs. @@ -130,6 +131,7 @@ def __init__( self.logginglevel = logginglevel self.report_status_data_queue = report_status_data_queue self.accelerator = accelerator + self.gpu_id = gpu_id self.network_subnet = network_subnet self.network_gateway = network_gateway self.epochs = epochs @@ -233,9 +235,11 @@ def from_dict(cls, data): # Class to manage the current scenario class ScenarioManagement: - def __init__(self, scenario): + def __init__(self, scenario, user=None): # Current scenario self.scenario = Scenario.from_dict(scenario) + # Uid of the user + self.user = user # Scenario management settings self.start_date_scenario = datetime.now().strftime("%d/%m/%Y %H:%M:%S") self.scenario_name = f"nebula_{self.scenario.federation}_{datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}" @@ -249,7 +253,7 @@ def __init__(self, scenario): # Assign the controller endpoint if self.scenario.deployment == "docker": - self.controller = "nebula-frontend" + self.controller = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-nebula-frontend" else: self.controller = f"127.0.0.1:{os.environ.get('NEBULA_FRONTEND_PORT')}" @@ -337,6 +341,7 @@ def __init__(self, scenario): participant_config["model_args"]["model"] = self.scenario.model participant_config["training_args"]["epochs"] = int(self.scenario.epochs) participant_config["device_args"]["accelerator"] = self.scenario.accelerator + participant_config["device_args"]["gpu_id"] = self.scenario.gpu_id participant_config["device_args"]["logging"] = self.scenario.logginglevel participant_config["aggregator_args"]["algorithm"] = self.scenario.agg_algorithm participant_config["adversarial_args"]["attacks"] = node_config["attacks"] @@ -379,7 +384,7 @@ def stop_blockchain(): logging.exception("Docker Compose failed to stop blockchain or blockchain already exited.") @staticmethod - def stop_participants(): + def stop_participants(scenario_name=None): # When stopping the nodes, we need to remove the current_scenario_commands.sh file -> it will cause the nodes to stop using PIDs try: nebula_config_dir = os.environ.get("NEBULA_CONFIG_DIR") @@ -388,47 +393,32 @@ def stop_participants(): nebula_base_dir = os.path.abspath(os.path.join(current_dir, "..")) nebula_config_dir = os.path.join(nebula_base_dir, "app", "config") logging.info(f"NEBULA_CONFIG_DIR not found. Using default path: {nebula_config_dir}") - if os.environ.get("NEBULA_HOST_PLATFORM") == "windows": - scenario_commands_file = os.path.join(nebula_config_dir, "current_scenario_commands.ps1") + + if scenario_name: + if os.environ.get("NEBULA_HOST_PLATFORM") == "windows": + scenario_commands_file = os.path.join( + nebula_config_dir, scenario_name, "current_scenario_commands.ps1" + ) + else: + scenario_commands_file = os.path.join( + nebula_config_dir, scenario_name, "current_scenario_commands.sh" + ) + if os.path.exists(scenario_commands_file): + os.remove(scenario_commands_file) else: - scenario_commands_file = os.path.join(nebula_config_dir, "current_scenario_commands.sh") - if os.path.exists(scenario_commands_file): - os.remove(scenario_commands_file) + if os.environ.get("NEBULA_HOST_PLATFORM") == "windows": + files = glob.glob( + os.path.join(nebula_config_dir, "**/current_scenario_commands.ps1"), recursive=True + ) + else: + files = glob.glob( + os.path.join(nebula_config_dir, "**/current_scenario_commands.sh"), recursive=True + ) + for file in files: + os.remove(file) except Exception as e: logging.exception(f"Error while removing current_scenario_commands.sh file: {e}") - if sys.platform == "win32": - try: - # kill all the docker containers which contain the word "nebula-core" - commands = [ - """docker kill $(docker ps -q --filter ancestor=nebula-core) | Out-Null""", - """docker rm $(docker ps -a -q --filter ancestor=nebula-core) | Out-Null""", - r"""docker network rm $(docker network ls | Where-Object { ($_ -split '\s+')[1] -like 'nebula-net-scenario' } | ForEach-Object { ($_ -split '\s+')[0] }) | Out-Null""", - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(f'powershell.exe -Command "{command}"') - # logging.info(f"Windows Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") - else: - try: - commands = [ - """docker kill $(docker ps -q --filter ancestor=nebula-core) > /dev/null 2>&1""", - """docker rm $(docker ps -a -q --filter ancestor=nebula-core) > /dev/null 2>&1""", - """docker network rm $(docker network ls | grep nebula-net-scenario | awk '{print $1}') > /dev/null 2>&1""", - ] - - for command in commands: - time.sleep(1) - exit_code = os.system(command) - # logging.info(f"Linux Command '{command}' executed with exit code: {exit_code}") - - except Exception as e: - raise Exception(f"Error while killing docker containers: {e}") - @staticmethod def stop_nodes(): logging.info("Closing NEBULA nodes... Please wait") @@ -653,174 +643,65 @@ def start_blockchain(self): raise e def start_nodes_docker(self): - import subprocess - - try: - # First, get the list of IDs of exited containers - result_ps = subprocess.run( - "docker ps -aq -f status=exited --filter 'name=nebula'", - shell=True, - check=True, - capture_output=True, - text=True, - ) - - # Get the container IDs - container_ids = result_ps.stdout.strip() - - if container_ids: - # Run the command to remove the containers - result_rm = subprocess.run( - "docker rm $(docker ps -aq -f status=exited --filter 'name=nebula')", - shell=True, - check=True, - capture_output=True, - text=True, - ) - print(f"Dangling containers removed successfully: {result_rm.stdout.strip()}.") - else: - print("No dangling containers to remove.") - except subprocess.CalledProcessError as e: - print(f"Error removing stopped containers: {e}") - print(f"Error output: {e.stderr}") - except Exception as e: - print(f"Unexpected error: {e}") - logging.info("Starting nodes using Docker Compose...") logging.info(f"env path: {self.env_path}") - docker_compose_template = textwrap.dedent( - """ - services: - {} - """ - ) + network_name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-{str(self.user).lower()}-nebula-net-scenario" - participant_template = textwrap.dedent( - """ - participant{}: - image: nebula-core - restart: no - volumes: - - {}:/nebula - - /var/run/docker.sock:/var/run/docker.sock - extra_hosts: - - "host.docker.internal:host-gateway" - ipc: host - privileged: true - command: - - /bin/bash - - -c - - | - {} && ifconfig && echo '{} host.docker.internal' >> /etc/hosts && python /nebula/nebula/node.py {} - networks: - nebula-net-scenario: - ipv4_address: {} - nebula-net-base: - {} - """ - ) - participant_template = textwrap.indent(participant_template, " " * 4) - - participant_gpu_template = textwrap.dedent( - """ - participant{}: - image: nebula-core - environment: - - NVIDIA_DISABLE_REQUIRE=true - restart: no - volumes: - - {}:/nebula - - /var/run/docker.sock:/var/run/docker.sock - extra_hosts: - - "host.docker.internal:host-gateway" - ipc: host - privileged: true - command: - - /bin/bash - - -c - - | - {} && ifconfig && echo '{} host.docker.internal' >> /etc/hosts && python /nebula/nebula/node.py {} - deploy: - resources: - reservations: - devices: - - driver: nvidia - count: all - capabilities: [gpu] - networks: - nebula-net-scenario: - ipv4_address: {} - nebula-net-base: - {} - """ - ) - participant_gpu_template = textwrap.indent(participant_gpu_template, " " * 4) - - network_template = textwrap.dedent( - """ - networks: - nebula-net-scenario: - name: nebula-net-scenario - driver: bridge - ipam: - config: - - subnet: {} - gateway: {} - nebula-net-base: - name: nebula-net-base - external: true - {} - {} - {} - """ - ) + # Create the Docker network + base = DockerUtils.create_docker_network(network_name) + + client = docker.from_env() - # Generate the Docker Compose file dynamically - services = "" self.config.participants.sort(key=lambda x: x["device_args"]["idx"]) - for node in self.config.participants: - idx = node["device_args"]["idx"] - path = f"/nebula/app/config/{self.scenario_name}/participant_{idx}.json" - logging.info(f"Starting node {idx} with configuration {path}") - logging.info("Node {} is listening on ip {}".format(idx, node["network_args"]["ip"])) - # Add one service for each participant + i = 2 + container_ids = [] + for idx, node in enumerate(self.config.participants): + image = "nebula-core" + name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-{self.user}-participant{node['device_args']['idx']}" + if node["device_args"]["accelerator"] == "gpu": - logging.info(f"Node {idx} is using GPU") - services += participant_gpu_template.format( - idx, - self.root_path, - "sleep 10" if node["device_args"]["start"] else "sleep 0", - self.scenario.network_gateway, - path, - node["network_args"]["ip"], - "proxy:" if self.scenario.deployment and self.use_blockchain else "", + environment = {"NVIDIA_DISABLE_REQUIRE": True} + host_config = client.api.create_host_config( + binds=[f"{self.root_path}:/nebula", "/var/run/docker.sock:/var/run/docker.sock"], + privileged=True, + device_requests=[docker.types.DeviceRequest(driver="nvidia", count=-1, capabilities=[["gpu"]])], + extra_hosts={"host.docker.internal": "host-gateway"}, ) else: - logging.info(f"Node {idx} is using CPU") - services += participant_template.format( - idx, - self.root_path, - "sleep 10" if node["device_args"]["start"] else "sleep 0", - self.scenario.network_gateway, - path, - node["network_args"]["ip"], - "proxy:" if self.scenario.deployment and self.use_blockchain else "", + environment = "" + host_config = client.api.create_host_config( + binds=[f"{self.root_path}:/nebula", "/var/run/docker.sock:/var/run/docker.sock"], + privileged=True, + device_requests=[], + extra_hosts={"host.docker.internal": "host-gateway"}, ) - docker_compose_file = docker_compose_template.format(services) - docker_compose_file += network_template.format( - self.scenario.network_subnet, - self.scenario.network_gateway, - "proxy:" if self.scenario.deployment and self.use_blockchain else "", - "name: chainnet" if self.scenario.deployment and self.use_blockchain else "", - "external: true" if self.scenario.deployment and self.use_blockchain else "", - ) - # Write the Docker Compose file in config directory - with open(f"{self.config_dir}/docker-compose.yml", "w") as f: - f.write(docker_compose_file) - # Include additional config to the participants - for idx, node in enumerate(self.config.participants): + volumes = ["/nebula", "/var/run/docker.sock"] + + start_command = "sleep 10" if node["device_args"]["start"] else "sleep 0" + command = [ + "/bin/bash", + "-c", + f"{start_command} && ifconfig && echo '{base}.1 host.docker.internal' >> /etc/hosts && python /nebula/nebula/node.py /nebula/app/config/{self.scenario_name}/participant_{node['device_args']['idx']}.json", + ] + + if self.use_blockchain: + networking_config = client.api.create_networking_config({ + f"{network_name}": client.api.create_endpoint_config( + ipv4_address=f"{base}.{i}", + ), + f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-nebula-net-base": client.api.create_endpoint_config(), + "chainnet": client.api.create_endpoint_config(), + }) + else: + networking_config = client.api.create_networking_config({ + f"{network_name}": client.api.create_endpoint_config( + ipv4_address=f"{base}.{i}", + ), + f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-nebula-net-base": client.api.create_endpoint_config(), + }) + node["tracking_args"]["log_dir"] = "/nebula/app/logs" node["tracking_args"]["config_dir"] = f"/nebula/app/config/{self.scenario_name}" node["scenario_args"]["controller"] = self.controller @@ -828,67 +709,32 @@ def start_nodes_docker(self): node["security_args"]["certfile"] = f"/nebula/app/certs/participant_{node['device_args']['idx']}_cert.pem" node["security_args"]["keyfile"] = f"/nebula/app/certs/participant_{node['device_args']['idx']}_key.pem" node["security_args"]["cafile"] = "/nebula/app/certs/ca_cert.pem" + node = json.loads(json.dumps(node).replace("192.168.50.", f"{base}.")) # TODO change this # Write the config file in config directory with open(f"{self.config_dir}/participant_{node['device_args']['idx']}.json", "w") as f: json.dump(node, f, indent=4) - # Start the Docker Compose file, catch error if any - try: - subprocess.check_call([ - "docker", - "compose", - "-f", - f"{self.config_dir}/docker-compose.yml", - "up", - "--build", - "-d", - ]) - except subprocess.CalledProcessError: - raise Exception( - "Docker Compose failed to start, please check if Docker Compose is installed (https://docs.docker.com/compose/install/) and Docker Engine is running." - ) - - container_ids = None - logging.info("Waiting for nodes to start...") - # Loop until all containers are running (equivalent to the number of participants) - while container_ids is None or len(container_ids) != len(self.config.participants): - time.sleep(3) try: - # Obtain docker ids - result = subprocess.run( - [ - "docker", - "compose", - "-f", - f"{self.config_dir}/docker-compose.yml", - "ps", - "-q", - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, + container_id = client.api.create_container( + image=image, + name=name, + detach=True, + volumes=volumes, + environment=environment, + command=command, + host_config=host_config, + networking_config=networking_config, ) + except Exception as e: + logging.exception(f"Creating container {name}: {e}") - if result.returncode != 0: - raise Exception(f"Error obtaining docker IDs: {result.stderr}") - - container_ids = result.stdout.strip().split("\n") - - except subprocess.CalledProcessError: - raise Exception( - "Docker Compose failed to start, please check if Docker Compose is installed " - "(https://docs.docker.com/compose/install/) and Docker Engine is running." - ) - - # Change log and config directory in dockers to /nebula/app, and change controller endpoint - for idx, node in enumerate(self.config.participants): - # Assign docker ID to node - node["device_args"]["docker_id"] = container_ids[idx] - - # Write the config file in config directory - with open(f"{self.config_dir}/participant_{node['device_args']['idx']}.json", "w") as f: - json.dump(node, f, indent=4) + try: + client.api.start(container_id) + container_ids.append(container_id) + except Exception as e: + logging.exception(f"Starting participant {name} error: {e}") + i += 1 def start_nodes_process(self): logging.info("Starting nodes as processes...") @@ -948,9 +794,9 @@ def start_nodes_process(self): commands += 'Write-Host "All nodes started. PIDs stored in $PID_FILE"\n' - with open("/nebula/app/config/current_scenario_commands.ps1", "w") as f: + with open(f"/nebula/app/config/{self.scenario_name}/current_scenario_commands.ps1", "w") as f: f.write(commands) - os.chmod("/nebula/app/config/current_scenario_commands.ps1", 0o755) + os.chmod(f"/nebula/app/config/{self.scenario_name}/current_scenario_commands.ps1", 0o755) else: commands = '#!/bin/bash\n\nPID_FILE="$(dirname "$0")/current_scenario_pids.txt"\n\n> $PID_FILE\n\n' sorted_participants = sorted( @@ -970,9 +816,9 @@ def start_nodes_process(self): commands += 'echo "All nodes started. PIDs stored in $PID_FILE"\n' - with open("/nebula/app/config/current_scenario_commands.sh", "w") as f: + with open(f"/nebula/app/config/{self.scenario_name}/current_scenario_commands.sh", "w") as f: f.write(commands) - os.chmod("/nebula/app/config/current_scenario_commands.sh", 0o755) + os.chmod(f"/nebula/app/config/{self.scenario_name}/current_scenario_commands.sh", 0o755) except Exception as e: raise Exception(f"Error starting nodes as processes: {e}") @@ -980,7 +826,7 @@ def start_nodes_process(self): @classmethod def remove_files_by_scenario(cls, scenario_name): try: - shutil.rmtree(Utils.check_path(os.environ["NEBULA_CONFIG_DIR"], scenario_name)) + shutil.rmtree(FileUtils.check_path(os.environ["NEBULA_CONFIG_DIR"], scenario_name)) except FileNotFoundError: logging.warning("Files not found, nothing to remove") except Exception as e: @@ -988,21 +834,21 @@ def remove_files_by_scenario(cls, scenario_name): logging.exception(e) raise e try: - shutil.rmtree(Utils.check_path(os.environ["NEBULA_LOGS_DIR"], scenario_name)) + shutil.rmtree(FileUtils.check_path(os.environ["NEBULA_LOGS_DIR"], scenario_name)) except PermissionError: # Avoid error if the user does not have enough permissions to remove the tf.events files logging.warning("Not enough permissions to remove the files, moving them to tmp folder") os.makedirs( - Utils.check_path(os.environ["NEBULA_ROOT"], os.path.join("app", "tmp", scenario_name)), + FileUtils.check_path(os.environ["NEBULA_ROOT"], os.path.join("app", "tmp", scenario_name)), exist_ok=True, ) os.chmod( - Utils.check_path(os.environ["NEBULA_ROOT"], os.path.join("app", "tmp", scenario_name)), + FileUtils.check_path(os.environ["NEBULA_ROOT"], os.path.join("app", "tmp", scenario_name)), 0o777, ) shutil.move( - Utils.check_path(os.environ["NEBULA_LOGS_DIR"], scenario_name), - Utils.check_path(os.environ["NEBULA_ROOT"], os.path.join("app", "tmp", scenario_name)), + FileUtils.check_path(os.environ["NEBULA_LOGS_DIR"], scenario_name), + FileUtils.check_path(os.environ["NEBULA_ROOT"], os.path.join("app", "tmp", scenario_name)), ) except FileNotFoundError: logging.warning("Files not found, nothing to remove") @@ -1039,11 +885,12 @@ def scenario_finished(self, timeout_seconds): @classmethod def generate_statistics(cls, path): try: - # Generate statistics - logging.info(f"Generating statistics for scenario {path}") - # Define input directories input_event_dirs = sorted(glob.glob(os.path.join(path, "metrics/*"))) + if not input_event_dirs: + return False + # Generate statistics + logging.info(f"Generating statistics for scenario {path}") # Where to write reduced TB events tb_events_output_dir = os.path.join(path, "metrics", "reduced-data") csv_out_path = os.path.join(path, "metrics", "reduced-data-as.csv") diff --git a/nebula/utils.py b/nebula/utils.py new file mode 100644 index 00000000..dddda225 --- /dev/null +++ b/nebula/utils.py @@ -0,0 +1,156 @@ +import logging +import os +import socket + +import docker + + +class FileUtils: + @classmethod + def check_path(cls, base_path, relative_path): + full_path = os.path.normpath(os.path.join(base_path, relative_path)) + base_path = os.path.normpath(base_path) + + if not full_path.startswith(base_path): + raise Exception("Not allowed") + return full_path + + +class SocketUtils: + @classmethod + def is_port_open(cls, port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.bind(("127.0.0.1", port)) + s.close() + return True + except OSError: + return False + + @classmethod + def find_free_port(cls, start_port=49152, end_port=65535): + for port in range(start_port, end_port + 1): + if cls.is_port_open(port): + return port + return None + + +class DockerUtils: + @classmethod + def create_docker_network(cls, network_name, subnet=None, prefix=24): + try: + # Connect to Docker + client = docker.from_env() + base_subnet = "192.168" + + # Obtain existing docker subnets + existing_subnets = [] + networks = client.networks.list() + + existing_network = next((n for n in networks if n.name == network_name), None) + + if existing_network: + ipam_config = existing_network.attrs.get("IPAM", {}).get("Config", []) + if ipam_config: + # Assume there's only one subnet per network for simplicity + existing_subnet = ipam_config[0].get("Subnet", "") + potential_base = ".".join(existing_subnet.split(".")[:3]) # Extract base from subnet + logging.info(f"Network '{network_name}' already exists with base {potential_base}") + return potential_base + + for network in networks: + ipam_config = network.attrs.get("IPAM", {}).get("Config", []) + if ipam_config: + for config in ipam_config: + if "Subnet" in config: + existing_subnets.append(config["Subnet"]) + + # If no subnet is provided or it exists, find the next available one + if not subnet or subnet in existing_subnets: + for i in range(50, 255): # Iterate over 192.168.50.0 to 192.168.254.0 + subnet = f"{base_subnet}.{i}.0/{prefix}" + potential_base = f"{base_subnet}.{i}" + if subnet not in existing_subnets: + break + else: + raise ValueError("No available subnets found.") + + # Create the Docker network + gateway = f"{subnet.split('/')[0].rsplit('.', 1)[0]}.1" + ipam_pool = docker.types.IPAMPool(subnet=subnet, gateway=gateway) + ipam_config = docker.types.IPAMConfig(pool_configs=[ipam_pool]) + network = client.networks.create(name=network_name, driver="bridge", ipam=ipam_config) + + logging.info(f"Network created: {network.name} with subnet {subnet}") + return potential_base + + except docker.errors.APIError: + logging.exception("Error interacting with Docker") + return None + except Exception: + logging.exception("Unexpected error") + return None + finally: + client.close() # Ensure the Docker client is closed + + @classmethod + def remove_docker_network(cls, network_name): + try: + # Connect to Docker + client = docker.from_env() + + # Get the network by name + network = client.networks.get(network_name) + + # Remove the network + network.remove() + + logging.info(f"Network {network_name} removed successfully.") + except docker.errors.NotFound: + logging.exception(f"Network {network_name} not found.") + except docker.errors.APIError: + logging.exception("Error interacting with Docker") + except Exception: + logging.exception("Unexpected error") + + @classmethod + def remove_docker_networks_by_prefix(cls, prefix): + try: + # Connect to Docker + client = docker.from_env() + + # List all networks + networks = client.networks.list() + + # Filter and remove networks with names starting with the prefix + for network in networks: + if network.name.startswith(prefix): + network.remove() + logging.info(f"Network {network.name} removed successfully.") + + except docker.errors.NotFound: + logging.info(f"One or more networks with prefix {prefix} not found.") + except docker.errors.APIError: + logging.info("Error interacting with Docker") + except Exception: + logging.info("Unexpected error") + + @classmethod + def remove_containers_by_prefix(cls, prefix): + try: + # Connect to Docker client + client = docker.from_env() + + containers = client.containers.list(all=True) # `all=True` to include stopped containers + + # Iterate through containers and remove those with the matching prefix + for container in containers: + if container.name.startswith(prefix): + logging.info(f"Removing container: {container.name}") + container.remove(force=True) # force=True to stop and remove if running + logging.info(f"Container {container.name} removed successfully.") + + except docker.errors.APIError: + logging.exception("Error interacting with Docker") + except Exception: + logging.exception("Unexpected error") diff --git a/pyproject.toml b/pyproject.toml index f068ea94..8bf81d32 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,27 @@ docs = [ "mkdocs-section-index<1.0.0,>=0.3.9", "mkdocstrings[python]<1.0.0,>=0.26.2", ] +controller = [ + "ansi2html==1.9.2", + "docker==7.1.0", + "fastapi[all]==0.114.0", + "geopy==2.4.1", + "gunicorn==23.0.0", + "jinja2==3.1.4", + "matplotlib==3.9.2", + "networkx==3.3", + "openpyxl==3.1.5", + "pandas==2.2.2", + "plotly==5.24.0", + "protobuf==4.25.3", + "psutil==6.0.0", + "pyopenssl==24.2.1", + "python-dotenv==1.0.1", + "requests==2.32.3", + "setuptools==74.1.2", + "uvicorn==0.30.6", + "wheel==0.44.0", +] core = [ "aiohttp==3.10.5", "ansi2html==1.9.2",
UserTitle Start time Model
{{ username|lower }}{{ title }} {{ start_time }} {{ model }}