diff --git a/runner/collect_results.py b/runner/collect_results.py index b6c158c..f39df8b 100755 --- a/runner/collect_results.py +++ b/runner/collect_results.py @@ -20,7 +20,7 @@ def dir_path(string): def main(): parser = argparse.ArgumentParser() parser.add_argument("out_file") - parser.add_argument("src_path", required=False, type=dir_path) + parser.add_argument("src_path", nargs="?", default=None, type=dir_path) parser.add_argument("--raw", dest="raw", action="store_true") args = parser.parse_args() diff --git a/runner/make-graphics.py b/runner/make-graphics.py new file mode 100755 index 0000000..e78d008 --- /dev/null +++ b/runner/make-graphics.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 + +from typing import Iterable, List, Tuple +import pandas as pd +import matplotlib.pyplot as plt +import matplotlib +import numpy as np +import argparse + +DEFAULT_YLIM = 1000 + +FONT = {"family": "serif", "size": 18} +LARGE_FONT = 28 + +STYLES = [ + ("o", "yellow", "orange"), + ("*", "brown", "brown"), + ("x", "teal", "teal"), + ("+", "pink", "red"), + ("*", "magenta", "magenta"), + ("v", "blue", "purple"), + (".", "orange", "orange"), + ("x", "cyan", "green"), +] + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("src_paths", nargs="+") + parser.add_argument("out_path") + parser.add_argument("--uri", dest="uri", action="store_true") + args = parser.parse_args() + df = load_df(args.src_paths) + render(df, args.out_path) + + +def load_df(src_paths: List[str]) -> pd.DataFrame: + dataframes = [] + for src_path in src_paths: + dataframes.append(pd.read_csv(src_path)) + dataframes[-1]["src_path"] = [src_path] * len(dataframes[-1].index) + df = pd.concat(dataframes) + df["runtime_version"] = [ + f"{target.replace('lf-', '').upper()} {scheduler}{src_path.split('.')[0].split('-')[-1]}" + for src_path, scheduler, target in zip( + df.src_path, + ( + [ scheduler + " " for scheduler in df.scheduler ] + if "scheduler" in df.columns else [""] * len(df.index) + ), + df.target + ) + ] + return df + + +def compute_legend(runtime_versions: Iterable[str]) -> List[Tuple[str, str, str, str]]: + assert len(STYLES) >= len(runtime_versions) + return [(a, *b) for a, b in zip(runtime_versions, STYLES)] + + +def render(df: pd.DataFrame, out_path: str): + matplotlib.rc("font", **FONT) + fig, axes = plt.subplots(6, 4) + fig.set_size_inches(30, 45) + axes = axes.ravel() + x = sorted(list(df.threads.unique())) + df_numbers = df[np.isfinite(df.mean_time_ms)] + for ax, benchmark in zip(axes, sorted(list(df.benchmark.unique()))): + df_benchmark = df_numbers[df_numbers.benchmark == benchmark] + top = 1.3 * df_benchmark[np.isfinite(df_benchmark.mean_time_ms)].mean_time_ms.max() + if pd.isna(top): + top = DEFAULT_YLIM + for version, marker, linecolor, markercolor in compute_legend( + df.runtime_version.unique() + ): + df_benchmark_scheduler = df_benchmark[ + df_benchmark.runtime_version == version + ] + ax.set_title(benchmark) + ax.set_xticks(x) + ax.set_ylim(bottom=0, top=top) + (line,) = ax.plot( + x, + [ + df_benchmark_scheduler[ + df_benchmark_scheduler.threads == threads + ].mean_time_ms.mean() + for threads in x + ], + marker=marker, + ms=12, + linewidth=2, + c=linecolor, + markeredgecolor=markercolor, + ) + line.set_label(version) + ax.legend() + ax = fig.add_subplot(111, frameon=False) + ax.xaxis.label.set_fontsize(LARGE_FONT) + ax.yaxis.label.set_fontsize(LARGE_FONT) + ax.title.set_fontsize(LARGE_FONT) + ax.set_facecolor("white") + plt.rc("font", size=LARGE_FONT) + plt.tick_params(labelcolor="none", top=False, bottom=False, left=False, right=False) + plt.title("Comparison of Scheduler Versions\n") + plt.xlabel("Number of Threads") + plt.ylabel("Mean Time (milliseconds)\n") + fig.patch.set_facecolor("white") + fig.savefig(out_path, transparent=False) + + +if __name__ == "__main__": + main() diff --git a/runner/requirements.txt b/runner/requirements.txt index 67b180b..0f26ab5 100644 --- a/runner/requirements.txt +++ b/runner/requirements.txt @@ -1,3 +1,4 @@ hydra-core>=1.2.0 cogapp +matplotlib pandas diff --git a/runner/run_benchmark.py b/runner/run_benchmark.py index 9f6894f..501c595 100755 --- a/runner/run_benchmark.py +++ b/runner/run_benchmark.py @@ -4,8 +4,11 @@ import hydra import logging import multiprocessing +import numpy as np import omegaconf import subprocess +from queue import Empty, Queue +from threading import Thread log = logging.getLogger("run_benchmark") @@ -72,16 +75,17 @@ def resolve_args(config_key): for step in ["prepare", "copy", "gen", "compile"]: if target[step] is not None: _, code = execute_command(target[step]) - check_return_code(code, continue_on_error) + if not check_return_code(code, continue_on_error): + return # run the benchmark if target["run"] is not None: + cmd = omegaconf.OmegaConf.to_object(target["run"]) if test_mode: # run the command with a timeout of 1 second. We only want to test # if the command executes correctly, not if the full benchmark runs # correctly as this would take too long - cmd = omegaconf.OmegaConf.to_object(target["run"]) - _, code = execute_command(["timeout", "1"] + cmd) + _, code = execute_command(["timeout", "1"] + cmd, 2) # timeout returns 124 if the command executed correctly but the # timeout was exceeded if code != 0 and code != 124: @@ -89,9 +93,16 @@ def resolve_args(config_key): f"Command returned with non-zero exit code ({code})" ) else: - output, code = execute_command(target["run"]) + output, code = execute_command( + cmd, + cfg["timeout"], + cfg["stacktrace"] if "stacktrace" in cfg else False + ) + if code == 124: + log.error(f"The command \"{' '.join([str(word) for word in cmd])}\" timed out.") check_return_code(code, continue_on_error) times = hydra.utils.call(target["parser"], output) + times += [np.infty] * (cfg["iterations"] - len(times)) write_results(times, cfg) else: raise ValueError(f"No run command provided for target {target_name}") @@ -107,6 +118,7 @@ def check_return_code(code, continue_on_error): raise RuntimeError( f"Command returned with non-zero exit code ({code})" ) + return code == 0 def check_benchmark_target_config(benchmark, target_name): benchmark_name = benchmark["name"] @@ -131,7 +143,7 @@ def check_benchmark_target_config(benchmark, target_name): return True -def execute_command(command): +def command_to_list(command): # the command can be a list of lists due to the way we use an omegaconf # resolver to determine the arguments. We need to flatten the command list # first. We also need to touch each element individually to make sure that @@ -142,25 +154,58 @@ def execute_command(command): cmd.extend(i) else: cmd.append(str(i)) + return cmd + + +def enqueue_output(out, queue): + while True: + line = out.readline() + queue.put(line) + if not line: + break + out.close() + +def execute_command(command, timeout=None, stacktrace=False): + cmd = command_to_list(command) cmd_str = " ".join(cmd) log.info(f"run command: {cmd_str}") - # run the command while printing and collecting its output output = [] with subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, text=True ) as process: + q = Queue() + t = Thread(target=enqueue_output, args=(process.stdout, q)) + t.daemon = True + t.start() cmd_log = logging.getLogger(command[0]) - while True: - nextline = process.stdout.readline() - if nextline == "" and process.poll() is not None: - break - elif nextline != "": - output.append(nextline) - cmd_log.info(nextline.rstrip()) - - code = process.returncode + try: + line = q.get(timeout=timeout) + while line: + line = q.get(timeout=timeout) + if line and not line.isspace(): + output.append(line) + cmd_log.info(line.rstrip()) + code = process.wait(timeout=timeout) + except (Empty, subprocess.TimeoutExpired): + cmd_log.error(f"{cmd_str} timed out.") + if stacktrace: + completed_stacktrace = None + cmd_log.info("We may need to ask you for sudo access in order to get a stacktrace.") + completed_stacktrace = subprocess.run( + ["sudo", "eu-stack", "-p", str(process.pid)], + capture_output=True + ) + process.kill() + if completed_stacktrace.returncode != 0: + cmd_log.error("Failed to debug the timed-out process.") + for line in ( + completed_stacktrace.stdout.decode().splitlines() + + completed_stacktrace.stderr.decode().splitlines() + ): + cmd_log.error(line) + return (output, 124) return output, code