Skip to content

Commit

Permalink
Add benchmarking script to the scheduler (#2071)
Browse files Browse the repository at this point in the history
Co-authored-by: ammar92 <[email protected]>
  • Loading branch information
jpbruinsslot and ammar92 authored Dec 6, 2023
1 parent 9877f53 commit dadbf82
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 4 deletions.
2 changes: 2 additions & 0 deletions mula/tests/scripts/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
data.csv
logs.txt
171 changes: 171 additions & 0 deletions mula/tests/scripts/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import argparse
import logging
import subprocess
import threading
import time
from pathlib import Path

import requests

SCHEDULER_API = "http://localhost:8004"
TIMEOUT_FOR_LOG_CAPTURE = 5

logger = logging.getLogger(__name__)


def are_tasks_done() -> bool:
response = requests.get(
url=f"{SCHEDULER_API}/tasks/stats",
)

try:
response.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("Error getting tasks")
raise

tasks_stats = response.json()

return all(tasks_stats[hour].get("queued") <= 0 for hour in tasks_stats)


def parse_stats() -> None:
resp_tasks_stats = requests.get(
url=f"{SCHEDULER_API}/tasks/stats",
)

try:
resp_tasks_stats.raise_for_status()
except requests.exceptions.HTTPError:
logger.error("Error getting tasks")
raise

tasks_stats = resp_tasks_stats.json()
for hour in tasks_stats:
queued = tasks_stats[hour].get("queued")
running = tasks_stats[hour].get("running")
failed = tasks_stats[hour].get("failed")
completed = tasks_stats[hour].get("completed")

logger.info(
"HOUR %s, QUEUED %s, RUNNING %s, FAILED %s, COMPLETED %s",
hour,
queued,
running,
failed,
completed,
)


def capture_logs(container_id: str, output_file: str) -> None:
# Capture logs
with Path.open(output_file, "w", encoding="utf-8") as file:
subprocess.run(
["docker", "logs", container_id],
stdout=file,
stderr=file,
check=True,
)


def parse_logs(path: str) -> None:
# Check if there were any errors in the logs
count = 0
with Path.open(path, encoding="utf-8") as file:
for line in file:
if line.startswith("ERROR") or line.startswith("Traceback"):
count += 1
logger.info(line)

if count > 0:
logger.error("Found %d errors in the logs", count)


def collect_cpu(container_id: str) -> str:
return (
subprocess.run(
["docker", "stats", "--no-stream", "--format", "{{.CPUPerc}}", container_id],
capture_output=True,
check=True,
)
.stdout.decode("utf-8")
.strip("%\n")
)


def collect_memory(container_id: str) -> str:
return (
subprocess.run(
["docker", "stats", "--no-stream", "--format", "{{.MemUsage}}", container_id],
capture_output=True,
check=True,
)
.stdout.decode("utf-8")
.split("/")[0]
.strip("MiB\n")
)


def run(container_id: str) -> None:
# Start capturing logs
if container_id is not None:
thread = threading.Thread(target=capture_logs, args=(container_id, "logs.txt"))
thread.start()

# Wait for tasks to finish
while not are_tasks_done():
logger.debug("Tasks are not done yet")

cpu = collect_cpu(container_id)
memory = collect_memory(container_id)
logger.info("CPU %s, MEMORY %s", cpu, memory)

# Parse stats
parse_stats()

time.sleep(10)
continue

logger.debug("Tasks are done")

# Stop capturing logs
thread.join(timeout=TIMEOUT_FOR_LOG_CAPTURE)

# Parse stats
parse_stats()

# Parse logs
parse_logs("logs.txt")


if __name__ == "__main__":
# Setup command line interface
parser = argparse.ArgumentParser(description="Benchmark the scheduler.")

# Add arguments
parser.add_argument("--verbose", "-v", action="store_true", help="Set to enable verbose logging.")

parser.add_argument(
"--container-id",
"-c",
type=str,
required=False,
help="The container id of the process to monitor.",
)

# Parse arguments
args = parser.parse_args()

# Configure logging level, if the -v (verbose) flag was given this will
# set the log-level to DEBUG (printing all debug messages and higher),
# if -v was not given it defaults to printing level warning and higher.
level = logging.INFO
if args.verbose:
default_loglevel = logging.DEBUG

logging.basicConfig(
level=level,
format="%(asctime)s %(name)-10s %(levelname)-8s %(message)s",
)

run(args.container_id)
25 changes: 21 additions & 4 deletions mula/tests/scripts/load.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import csv
import uuid
from datetime import datetime, timezone
Expand All @@ -11,10 +12,10 @@
SCHEDULER_API = "http://localhost:8004"


def run():
def run(org_num: int = 1):
# Create organisations
orgs: List[Dict[str, Any]] = []
for n in range(1, 10):
for n in range(0, org_num):
org = {
"id": f"org-{n}",
"name": f"Organisation {n}",
Expand Down Expand Up @@ -63,7 +64,14 @@ def run():
print("Enabled boefje ", boefje_id)

declarations: List[Dict[str, Any]] = []
with Path("data.csv").open(newline="") as csv_file:

# Check if data file exists
if not Path("data.csv").exists():
print("data.csv file not found")
return

data_file = Path("data.csv").open(newline="", encoding="utf-8")
with data_file as csv_file:
csv_reader = csv.DictReader(csv_file, delimiter=",", quotechar='"')
for row in csv_reader:
name = row["name"]
Expand Down Expand Up @@ -120,4 +128,13 @@ def run():


if __name__ == "__main__":
run()
# Setup command line interface
parser = argparse.ArgumentParser(description="Load test the scheduler")

# Add arguments
parser.add_argument("--orgs", type=int, default=1, help="Number of organisations to create")

# Parse arguments
args = parser.parse_args()

run(org_num=args.orgs)

0 comments on commit dadbf82

Please sign in to comment.