Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hook the benchmark runner and benchmark visualizations into GitHub #30

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions runner/collect_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import json


def dir_path(string):
if os.path.isdir(string):
def src_path_type(string):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't change anymore right? Since dir_path now is an optional argument and we don't explicitly use 'latest'

if os.path.isdir(string) or string == "latest":
return string
else:
raise NotADirectoryError(string)
Expand Down
114 changes: 114 additions & 0 deletions runner/make-graphics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python3

from typing import Iterable, List, Tuple
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import argparse

DEFAULT_YLIM = 1000

FONT = {"family": "serif", "size": 18}
LARGE_FONT = 28

STYLES = [
("o", "yellow", "orange"),
("*", "brown", "brown"),
("x", "teal", "teal"),
("+", "pink", "red"),
("*", "magenta", "magenta"),
("v", "blue", "purple"),
(".", "orange", "orange"),
("x", "cyan", "green"),
]


def main():
parser = argparse.ArgumentParser()
parser.add_argument("src_paths", nargs="+")
parser.add_argument("out_path")
parser.add_argument("--uri", dest="uri", action="store_true")
args = parser.parse_args()
df = load_df(args.src_paths)
render(df, args.out_path)


def load_df(src_paths: List[str]) -> pd.DataFrame:
dataframes = []
for src_path in src_paths:
dataframes.append(pd.read_csv(src_path))
dataframes[-1]["src_path"] = [src_path] * len(dataframes[-1].index)
df = pd.concat(dataframes)
df["runtime_version"] = [
f"{target.replace('lf-', '').upper()} {scheduler}{src_path.split('.')[0].split('-')[-1]}"
for src_path, scheduler, target in zip(
df.src_path,
(
[ scheduler + " " for scheduler in df.scheduler ]
if "scheduler" in df.columns else [""] * len(df.index)
),
df.target
)
]
return df


def compute_legend(runtime_versions: Iterable[str]) -> List[Tuple[str, str, str, str]]:
assert len(STYLES) >= len(runtime_versions)
return [(a, *b) for a, b in zip(runtime_versions, STYLES)]


def render(df: pd.DataFrame, out_path: str):
matplotlib.rc("font", **FONT)
fig, axes = plt.subplots(6, 4)
fig.set_size_inches(30, 45)
axes = axes.ravel()
x = sorted(list(df.threads.unique()))
df_numbers = df[np.isfinite(df.mean_time_ms)]
for ax, benchmark in zip(axes, sorted(list(df.benchmark.unique()))):
df_benchmark = df_numbers[df_numbers.benchmark == benchmark]
top = 1.3 * df_benchmark[np.isfinite(df_benchmark.mean_time_ms)].mean_time_ms.max()
if pd.isna(top):
top = DEFAULT_YLIM
for version, marker, linecolor, markercolor in compute_legend(
df.runtime_version.unique()
):
df_benchmark_scheduler = df_benchmark[
df_benchmark.runtime_version == version
]
ax.set_title(benchmark)
ax.set_xticks(x)
ax.set_ylim(bottom=0, top=top)
(line,) = ax.plot(
x,
[
df_benchmark_scheduler[
df_benchmark_scheduler.threads == threads
].mean_time_ms.mean()
for threads in x
],
marker=marker,
ms=12,
linewidth=2,
c=linecolor,
markeredgecolor=markercolor,
)
line.set_label(version)
ax.legend()
ax = fig.add_subplot(111, frameon=False)
ax.xaxis.label.set_fontsize(LARGE_FONT)
ax.yaxis.label.set_fontsize(LARGE_FONT)
ax.title.set_fontsize(LARGE_FONT)
ax.set_facecolor("white")
plt.rc("font", size=LARGE_FONT)
plt.tick_params(labelcolor="none", top=False, bottom=False, left=False, right=False)
plt.title("Comparison of Scheduler Versions\n")
plt.xlabel("Number of Threads")
plt.ylabel("Mean Time (milliseconds)\n")
fig.patch.set_facecolor("white")
fig.savefig(out_path, transparent=False)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions runner/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
hydra-core>=1.2.0
cogapp
matplotlib
pandas
77 changes: 61 additions & 16 deletions runner/run_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import hydra
import logging
import multiprocessing
import numpy as np
import omegaconf
import subprocess
from queue import Empty, Queue
from threading import Thread


log = logging.getLogger("run_benchmark")
Expand Down Expand Up @@ -72,26 +75,34 @@ def resolve_args(config_key):
for step in ["prepare", "copy", "gen", "compile"]:
if target[step] is not None:
_, code = execute_command(target[step])
check_return_code(code, continue_on_error)
if not check_return_code(code, continue_on_error):
return

# run the benchmark
if target["run"] is not None:
cmd = omegaconf.OmegaConf.to_object(target["run"])
if test_mode:
# run the command with a timeout of 1 second. We only want to test
# if the command executes correctly, not if the full benchmark runs
# correctly as this would take too long
cmd = omegaconf.OmegaConf.to_object(target["run"])
_, code = execute_command(["timeout", "1"] + cmd)
_, code = execute_command(["timeout", "1"] + cmd, 2)
# timeout returns 124 if the command executed correctly but the
# timeout was exceeded
if code != 0 and code != 124:
raise RuntimeError(
f"Command returned with non-zero exit code ({code})"
)
else:
output, code = execute_command(target["run"])
output, code = execute_command(
cmd,
cfg["timeout"],
cfg["stacktrace"] if "stacktrace" in cfg else False
)
if code == 124:
log.error(f"The command \"{' '.join([str(word) for word in cmd])}\" timed out.")
check_return_code(code, continue_on_error)
times = hydra.utils.call(target["parser"], output)
times += [np.infty] * (cfg["iterations"] - len(times))
write_results(times, cfg)
else:
raise ValueError(f"No run command provided for target {target_name}")
Expand All @@ -107,6 +118,7 @@ def check_return_code(code, continue_on_error):
raise RuntimeError(
f"Command returned with non-zero exit code ({code})"
)
return code == 0

def check_benchmark_target_config(benchmark, target_name):
benchmark_name = benchmark["name"]
Expand All @@ -131,7 +143,7 @@ def check_benchmark_target_config(benchmark, target_name):
return True


def execute_command(command):
def command_to_list(command):
# the command can be a list of lists due to the way we use an omegaconf
# resolver to determine the arguments. We need to flatten the command list
# first. We also need to touch each element individually to make sure that
Expand All @@ -142,25 +154,58 @@ def execute_command(command):
cmd.extend(i)
else:
cmd.append(str(i))
return cmd


def enqueue_output(out, queue):
while True:
line = out.readline()
queue.put(line)
if not line:
break
out.close()


def execute_command(command, timeout=None, stacktrace=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide some context on why you changes this and what exactly it does?

Copy link
Contributor Author

@petervdonovan petervdonovan Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to read the output of the process in a non-blocking way so that we could capture the stack trace of the running process before killing it. By default, the stack trace is captured because the user may not have eu-stack, but I wanted this feature so that ideally, it might be possible to debug a deadlock in CI without having to manually run the executable hundreds of times.

cmd = command_to_list(command)
cmd_str = " ".join(cmd)
log.info(f"run command: {cmd_str}")

# run the command while printing and collecting its output
output = []
with subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, text=True
) as process:
q = Queue()
t = Thread(target=enqueue_output, args=(process.stdout, q))
t.daemon = True
t.start()
cmd_log = logging.getLogger(command[0])
while True:
nextline = process.stdout.readline()
if nextline == "" and process.poll() is not None:
break
elif nextline != "":
output.append(nextline)
cmd_log.info(nextline.rstrip())

code = process.returncode
try:
line = q.get(timeout=timeout)
while line:
line = q.get(timeout=timeout)
if line and not line.isspace():
output.append(line)
cmd_log.info(line.rstrip())
code = process.wait(timeout=timeout)
except (Empty, subprocess.TimeoutExpired):
cmd_log.error(f"{cmd_str} timed out.")
if stacktrace:
completed_stacktrace = None
cmd_log.info("We may need to ask you for sudo access in order to get a stacktrace.")
completed_stacktrace = subprocess.run(
["sudo", "eu-stack", "-p", str(process.pid)],
capture_output=True
)
process.kill()
if completed_stacktrace.returncode != 0:
cmd_log.error("Failed to debug the timed-out process.")
for line in (
completed_stacktrace.stdout.decode().splitlines()
+ completed_stacktrace.stderr.decode().splitlines()
):
cmd_log.error(line)
return (output, 124)

return output, code

Expand Down