From ed23f59ea88a9c591daa77bcdc6c1db46866babc Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Tue, 1 Oct 2019 16:24:37 +0200 Subject: [PATCH 1/9] Robustify subprocess handling Python from 3.2 on support timeouts on subprocesses directly subprocess32 backports that to Python 2 --- rebench/configurator.py | 3 +- rebench/executor.py | 109 +++++++++---- rebench/interop/time_adapter.py | 2 +- rebench/persistence.py | 3 +- rebench/subprocess_with_timeout.py | 190 ----------------------- rebench/tests/executor_test.py | 5 +- rebench/tests/subprocess_timeout_test.py | 75 --------- setup.py | 3 +- 8 files changed, 88 insertions(+), 302 deletions(-) delete mode 100644 rebench/subprocess_with_timeout.py delete mode 100644 rebench/tests/subprocess_timeout_test.py diff --git a/rebench/configurator.py b/rebench/configurator.py index cde033b6..69444038 100644 --- a/rebench/configurator.py +++ b/rebench/configurator.py @@ -17,7 +17,8 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. -import subprocess +import subprocess32 as subprocess + from os.path import dirname from .model.experiment import Experiment diff --git a/rebench/executor.py b/rebench/executor.py index d33a644f..43a9f543 100644 --- a/rebench/executor.py +++ b/rebench/executor.py @@ -26,14 +26,65 @@ import os import pkgutil import random -import subprocess import sys -from threading import Thread, RLock -from time import time - -from humanfriendly.compat import coerce_string +import errno +import subprocess32 as subprocess + + +def make_subprocess_runner(): + # subprocess.run is superb, but we do not have access + # to the processes objects in case we need to kill them... + # (stripped to whats needed here) + + meta = (set(), RLock()) # poor man's closure + def _put(what): + with meta[1]:#lock + meta[0].add(what)#set + def _pop(what): + with meta[1]:#lock + meta[0].discard(what) #set + def _walk(fun, *args, **kwargs): + with meta[1]:#lock + fun(meta[0], *args, **kwargs)#set + def _run(*args, **kwargs): + input = kwargs.pop('input', None) + timeout = kwargs.pop('timeout', None) + if input is not None: + assert not 'stdin' in kwargs + kwargs['stdin'] = subprocess.PIPE + with subprocess.Popen(*args, **kwargs) as process: + try: + _put(process) + stdout, stderr = process.communicate(input, timeout=timeout) + except subprocess.TimeoutExpired: + process.kill() + stdout, stderr = process.communicate() + raise subprocess.TimeoutExpired(process.args, timeout, output=stdout, + stderr=stderr) + except: + process.kill() + process.wait() + raise + finally: + _pop(process) + retcode = process.poll() + return subprocess.CompletedProcess(process.args, retcode, stdout, stderr) + return _run, _walk +subprocess_run, walk_processes = make_subprocess_runner() + +def terminate_processes(): + def _signal(procs): + for proc in procs: + if proc is not None and proc.poll() is None: + proc.kill() + walk_processes(_signal) + +from humanfriendly.compat import coerce_string, is_unicode +def _maybe_decode(what): + if not is_unicode(what): + return coerce_string(what.decode('utf-8')) + return what -from . import subprocess_with_timeout as subprocess_timeout from .interop.adapter import ExecutionDeliveredNoResults from .ui import escape_braces @@ -337,15 +388,11 @@ def _execute_build_cmd(self, build_command, name, run_id): self._scheduler.indicate_build(run_id) self._ui.debug_output_info("Start build\n", None, script, path) - def _keep_alive(seconds): - self._ui.warning( - "Keep alive. current job runs since %dmin\n" % (seconds / 60), run_id, script, path) - try: - return_code, stdout_result, stderr_result = subprocess_timeout.run( - '/bin/sh', path, False, True, - stdin_input=str.encode(script), - keep_alive_output=_keep_alive) + result = subprocess_run('/bin/sh', + cwd=path, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + shell=False, input=script.encode('utf-8')) except OSError as err: build_command.mark_failed() run_id.fail_immediately() @@ -361,17 +408,17 @@ def _keep_alive(seconds): self._ui.error(msg, run_id, script, path) return - stdout_result = coerce_string(stdout_result.decode('utf-8')) - stderr_result = coerce_string(stderr_result.decode('utf-8')) + stdout_result = _maybe_decode(result.stdout) + stderr_result = _maybe_decode(result.stderr) if self._build_log: self.process_output(name, stdout_result, stderr_result) - if return_code != 0: + if result.returncode != 0: build_command.mark_failed() run_id.fail_immediately() run_id.report_run_failed( - script, return_code, "Build of " + name + " failed.") + script, result.returncode, "Build of " + name + " failed.") self._ui.error("{ind}Build of " + name + " failed.\n", None, script, path) if stdout_result and stdout_result.strip(): lines = escape_braces(stdout_result).split('\n') @@ -446,20 +493,16 @@ def _get_gauge_adapter_instance(self, adapter_name): def _generate_data_point(self, cmdline, gauge_adapter, run_id, termination_check): # execute the external program here - + timed_out = False try: self._ui.debug_output_info("{ind}Starting run\n", run_id, cmdline) - - def _keep_alive(seconds): - self._ui.warning( - "Keep alive. current job runs since %dmin\n" % (seconds / 60), run_id, cmdline) - - (return_code, output, _) = subprocess_timeout.run( + result = subprocess_run( cmdline, cwd=run_id.location, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, shell=True, verbose=self._debug, - timeout=run_id.max_invocation_time, - keep_alive_output=_keep_alive) - output = output.decode('utf-8') + stderr=subprocess.STDOUT, + shell=True, timeout=run_id.max_invocation_time, + start_new_session=True) + output = _maybe_decode(result.stdout) + return_code = result.returncode except OSError as err: run_id.fail_immediately() if err.errno == 2: @@ -470,9 +513,13 @@ def _keep_alive(seconds): msg = str(err) self._ui.error(msg, run_id, cmdline) return True + except subprocess.TimeoutExpired as expired: + timed_out = True + output = _maybe_decode(expired.stdout) + return_code = errno.ETIME if return_code != 0 and not self._include_faulty and not ( - return_code == subprocess_timeout.E_TIMEOUT and run_id.ignore_timeouts): + timed_out and run_id.ignore_timeouts): run_id.indicate_failed_execution() run_id.report_run_failed(cmdline, return_code, output) if return_code == 126: @@ -480,7 +527,7 @@ def _keep_alive(seconds): + "{ind}{ind}The file may not be marked as executable.\n" + "{ind}Return code: %d\n") % ( run_id.benchmark.suite.executor.name, return_code) - elif return_code == subprocess_timeout.E_TIMEOUT: + elif timed_out: msg = ("{ind}Run timed out.\n" + "{ind}{ind}Return code: %d\n" + "{ind}{ind}max_invocation_time: %s\n") % ( diff --git a/rebench/interop/time_adapter.py b/rebench/interop/time_adapter.py index 352bb78a..a7ae12bc 100644 --- a/rebench/interop/time_adapter.py +++ b/rebench/interop/time_adapter.py @@ -18,7 +18,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import re -import subprocess +import subprocess32 as subprocess from .adapter import GaugeAdapter, OutputNotParseable from ..model.data_point import DataPoint from ..model.measurement import Measurement diff --git a/rebench/persistence.py b/rebench/persistence.py index 9c87cb3d..06a64d7b 100644 --- a/rebench/persistence.py +++ b/rebench/persistence.py @@ -19,8 +19,9 @@ # IN THE SOFTWARE. import os import shutil -import subprocess import sys +import subprocess32 as subprocess + from datetime import datetime from tempfile import NamedTemporaryFile from threading import Lock diff --git a/rebench/subprocess_with_timeout.py b/rebench/subprocess_with_timeout.py deleted file mode 100644 index 65c118e7..00000000 --- a/rebench/subprocess_with_timeout.py +++ /dev/null @@ -1,190 +0,0 @@ -from __future__ import print_function - -from os import kill -from select import select -from signal import SIGKILL -from subprocess import PIPE, STDOUT, Popen -from threading import Thread, Condition -from time import time - -import sys - -IS_PY3 = None - -try: - _ = ProcessLookupError - IS_PY3 = True -except NameError: - IS_PY3 = False - -# Indicate timeout with standard exit code -E_TIMEOUT = -9 - - -class _SubprocessThread(Thread): - - def __init__(self, executable_name, args, shell, cwd, verbose, stdout, stderr, stdin_input): - Thread.__init__(self, name="Subprocess %s" % executable_name) - self._args = args - self._shell = shell - self._cwd = cwd - self._verbose = verbose - self._stdout = stdout - self._stderr = stderr - self._stdin_input = stdin_input - - self._pid = None - self._started_cv = Condition() - - self.stdout_result = None - self.stderr_result = None - self.returncode = None - self._exception = None - - @property - def exception(self): - return self._exception - - def run(self): - try: - self._started_cv.acquire() - stdin = PIPE if self._stdin_input else None - proc = Popen(self._args, shell=self._shell, cwd=self._cwd, - stdin=stdin, stdout=self._stdout, stderr=self._stderr) - self._pid = proc.pid - self._started_cv.notify() - self._started_cv.release() - - if self._stdin_input: - proc.stdin.write(self._stdin_input) - proc.stdin.flush() - - self.process_output(proc) - self.returncode = proc.returncode - except Exception as err: # pylint: disable=broad-except - self._exception = err - - def get_pid(self): - self._started_cv.acquire() - while self._pid is None: - self._started_cv.wait() - self._started_cv.release() - return self._pid - - def process_output(self, proc): - if self._verbose and self._stdout == PIPE and (self._stderr == PIPE or - self._stderr == STDOUT): - self.stdout_result = "" - self.stderr_result = "" - - while True: - reads = [proc.stdout.fileno()] - if self._stderr == PIPE: - reads.append(proc.stderr.fileno()) - ret = select(reads, [], []) - - for file_no in ret[0]: - if file_no == proc.stdout.fileno(): - read = proc.stdout.readline() - sys.stdout.write(read) - self.stdout_result += read - if self._stderr == PIPE and file_no == proc.stderr.fileno(): - read = proc.stderr.readline() - sys.stderr.write(read) - self.stderr_result += read - - if proc.poll() is not None: - break - else: - self.stdout_result, self.stderr_result = proc.communicate() - - -def _print_keep_alive(seconds_since_start): - print("Keep alive, current job runs for %dmin\n" % (seconds_since_start / 60)) - - -def run(args, cwd=None, shell=False, kill_tree=True, timeout=-1, - verbose=False, stdout=PIPE, stderr=PIPE, stdin_input=None, - keep_alive_output=_print_keep_alive): - """ - Run a command with a timeout after which it will be forcibly - killed. - """ - executable_name = args.split(' ')[0] - - thread = _SubprocessThread(executable_name, args, shell, cwd, verbose, stdout, - stderr, stdin_input) - thread.start() - - if timeout == -1: - thread.join() - else: - t10min = 10 * 60 - if timeout < t10min: - thread.join(timeout) - else: - start = time() - diff = 0 - while diff < timeout: - if t10min < timeout - diff: - max_10min_timeout = t10min - else: - max_10min_timeout = timeout - diff - thread.join(max_10min_timeout) - if not thread.is_alive(): - break - diff = time() - start - if diff < timeout: - keep_alive_output(diff) - - if timeout != -1 and thread.is_alive(): - assert thread.get_pid() is not None - return _kill_process(thread.get_pid(), kill_tree, thread) - - if not thread.is_alive(): - exp = thread.exception - if exp: - raise exp # pylint: disable=raising-bad-type - - return thread.returncode, thread.stdout_result, thread.stderr_result - - -def _kill_py2(proc_id): - try: - kill(proc_id, SIGKILL) - except IOError: - # it's a race condition, so let's simply ignore it - pass - - -def _kill_py3(proc_id): - try: - kill(proc_id, SIGKILL) - except ProcessLookupError: # pylint: disable=undefined-variable - # it's a race condition, so let's simply ignore it - pass - - -def _kill_process(pid, recursively, thread): - pids = [pid] - if recursively: - pids.extend(_get_process_children(pid)) - - for proc_id in pids: - if IS_PY3: - _kill_py3(proc_id) - else: - _kill_py2(proc_id) - - thread.join() - - return E_TIMEOUT, thread.stdout_result, thread.stderr_result - - -def _get_process_children(pid): - proc = Popen('pgrep -P %d' % pid, shell=True, stdout=PIPE, stderr=PIPE) - stdout, _stderr = proc.communicate() - result = [int(p) for p in stdout.split()] - for child in result[:]: - result.extend(_get_process_children(child)) - return result diff --git a/rebench/tests/executor_test.py b/rebench/tests/executor_test.py index 5a522fa8..e6e410b4 100644 --- a/rebench/tests/executor_test.py +++ b/rebench/tests/executor_test.py @@ -18,8 +18,9 @@ # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. import unittest -import subprocess import os +import sys +import subprocess32 as subprocess from .persistence import TestPersistence from .rebench_test_case import ReBenchTestCase @@ -192,7 +193,7 @@ def test_determine_exp_name_and_filters_only_others(self): self.assertEqual(exp_filter, ['e:bar', 's:b']) -def Popen_override(cmdline, stdout, stdin=None, stderr=None, cwd=None, shell=None): # pylint: disable=unused-argument +def Popen_override(cmdline, *args, **kwargs): # pylint: disable=unused-argument class Popen(object): returncode = 0 diff --git a/rebench/tests/subprocess_timeout_test.py b/rebench/tests/subprocess_timeout_test.py deleted file mode 100644 index 9fc788a8..00000000 --- a/rebench/tests/subprocess_timeout_test.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright (c) 2009-2014 Stefan Marr -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -import os -import subprocess -import unittest - -from .. import subprocess_with_timeout as sub - - -class SubprocessTimeoutTest(unittest.TestCase): - """Test the support for execution of programs with timeouts""" - - def setUp(self): - self._path = os.path.dirname(os.path.realpath(__file__)) - - def test_normal_exec_no_timeout(self): - cmdline = "sleep 1; echo Done" - (return_code, output, err) = sub.run(cmdline, cwd=self._path, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - shell=True, timeout=10) - - output = output.decode('utf-8') - self.assertEqual(0, return_code) - self.assertEqual("Done\n", output) - self.assertEqual(None, err) - - def test_exec_with_timeout(self): - cmdline = "sleep 100; echo Done" - (return_code, output, err) = sub.run(cmdline, cwd=self._path, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - shell=True, timeout=1) - - output = output.decode('utf-8') - self.assertEqual(sub.E_TIMEOUT, return_code) - self.assertEqual("", output) - self.assertEqual(None, err) - - def test_exec_with_timeout_python_interpreter(self): - cmdline = "python -c \"while True: pass\"" - (return_code, output, err) = sub.run(cmdline, cwd=self._path, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - shell=True, timeout=5) - - output = output.decode('utf-8') - self.assertEqual(sub.E_TIMEOUT, return_code) - self.assertEqual("", output) - self.assertEqual(None, err) - - -def test_suite(): - return unittest.makeSuite(SubprocessTimeoutTest) - - -if __name__ == "__main__": - unittest.main(defaultTest='test_suite') diff --git a/setup.py b/setup.py index 48f47ce4..61bb2c91 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,8 @@ install_requires=[ 'PyYAML>=3.12', 'pykwalify>=1.6.1', - 'humanfriendly>=4.12' + 'humanfriendly>=4.12', + 'subprocess32>=3.5.0' ], entry_points = { 'console_scripts' : ['rebench = rebench.rebench:main_func'] From 5ac081c4f99dc1faa5b518439a43cc61b3407f93 Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Tue, 1 Oct 2019 16:29:23 +0200 Subject: [PATCH 2/9] Code defaults via default parameters. Easier to read. Also, `timeout=None` because that's what modern subprocess expects --- rebench/model/exp_run_details.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rebench/model/exp_run_details.py b/rebench/model/exp_run_details.py index d1e22987..dfc66d89 100644 --- a/rebench/model/exp_run_details.py +++ b/rebench/model/exp_run_details.py @@ -54,13 +54,13 @@ def empty(cls): @classmethod def default(cls, invocations_override, iterations_override): - return ExpRunDetails(1, 1, None, 50, -1, None, None, True, 0, - invocations_override, iterations_override) + return ExpRunDetails(invocations_override=invocations_override, + iterations_override=iterations_override) - def __init__(self, invocations, iterations, warmup, min_iteration_time, - max_invocation_time, ignore_timeouts, parallel_interference_factor, - execute_exclusively, retries_after_failure, - invocations_override, iterations_override): + def __init__(self, invocations=1, iterations=1, warmup=None, min_iteration_time=50, + max_invocation_time=None, ignore_timeouts=None, parallel_interference_factor=None, + execute_exclusively=True, retries_after_failure=0, + invocations_override=None, iterations_override=None): self._invocations = invocations self._iterations = iterations self._warmup = warmup From d46f79ca8598012b476628ff4e23ddb349b090f3 Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Tue, 1 Oct 2019 16:30:39 +0200 Subject: [PATCH 3/9] utilize humanfriendly where it helps --- rebench/tests/features/issue_81_unicode_test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rebench/tests/features/issue_81_unicode_test.py b/rebench/tests/features/issue_81_unicode_test.py index e801db23..b975a57e 100644 --- a/rebench/tests/features/issue_81_unicode_test.py +++ b/rebench/tests/features/issue_81_unicode_test.py @@ -21,6 +21,7 @@ import os from codecs import open as open_with_enc +from humanfriendly.compat import unichr from ...configurator import Configurator, load_config from ...executor import Executor @@ -53,10 +54,7 @@ def test_building(self): with open_with_enc(self._path + '/build.log', 'r', encoding='utf8') as build_file: log = build_file.read() - try: - unicode_char = unichr(22234) - except NameError: - unicode_char = chr(22234) + unicode_char = unichr(22234) self.assertGreaterEqual(15, log.find(unicode_char)) # Executor:VM1|STD: self.assertGreaterEqual(log.find(unicode_char, 16), 36) # Executor:VM1|ERR: From 5ee64a06734f3a278ab6afdf81535701f472e81e Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Tue, 1 Oct 2019 16:31:33 +0200 Subject: [PATCH 4/9] Robustify process handling in presence of threads There are many caveats, but at least Ctrl-C now works as expected. --- rebench/executor.py | 59 ++++++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/rebench/executor.py b/rebench/executor.py index 43a9f543..114d303a 100644 --- a/rebench/executor.py +++ b/rebench/executor.py @@ -23,6 +23,9 @@ from collections import deque from math import floor from multiprocessing import cpu_count +from threading import Thread, RLock, Event +from time import time + import os import pkgutil import random @@ -114,7 +117,7 @@ def _filter_out_completed_runs(runs, ui): def number_of_uncompleted_runs(runs, ui): return len(RunScheduler._filter_out_completed_runs(runs, ui)) - def _process_remaining_runs(self, runs): + def _process_remaining_runs(self, runs, stop_signalled=None): """Abstract, to be implemented""" def _estimate_time_left(self): @@ -162,23 +165,27 @@ def execute(self): class BatchScheduler(RunScheduler): - def _process_remaining_runs(self, runs): + def _process_remaining_runs(self, runs, stop_signalled=None): for run_id in runs: try: completed = False - while not completed: + while not completed and \ + (stop_signalled is None or not stop_signalled.is_set()): completed = self._executor.execute_run(run_id) self._indicate_progress(completed, run_id) except FailedBuilding: pass + if stop_signalled is not None and stop_signalled.is_set(): + break class RoundRobinScheduler(RunScheduler): - def _process_remaining_runs(self, runs): + def _process_remaining_runs(self, runs, stop_signalled=None): task_list = deque(runs) - while task_list: + while task_list and \ + (stop_signalled is None or not stop_signalled.is_set()): try: run = task_list.popleft() completed = self._executor.execute_run(run) @@ -194,7 +201,8 @@ class RandomScheduler(RunScheduler): def _process_remaining_runs(self, runs): task_list = list(runs) - while task_list: + while task_list and \ + (stop_signalled is None or not stop_signalled.is_set()): run = random.choice(task_list) try: completed = self._executor.execute_run(run) @@ -207,23 +215,27 @@ def _process_remaining_runs(self, runs): class BenchmarkThread(Thread): - def __init__(self, par_scheduler, num): + def __init__(self, par_scheduler, num, exceptions): Thread.__init__(self, name="BenchmarkThread %d" % num) self._par_scheduler = par_scheduler self._id = num - self.exception = None + self.exceptions = exceptions + self.should_stop = Event() + + def tell_stop(self): + self.should_stop.set() def run(self): try: scheduler = self._par_scheduler.get_local_scheduler() - while True: + while not self.should_stop.is_set(): work = self._par_scheduler.acquire_work() if work is None: return - scheduler._process_remaining_runs(work) + scheduler._process_remaining_runs(work, stop_signalled=self.should_stop) except BaseException as exp: - self.exception = exp + self.exceptions.append(exp) class BenchmarkThreadExceptions(Exception): @@ -269,23 +281,32 @@ def _process_sequential_runs(self, runs): def _process_remaining_runs(self, runs): self._remaining_work = self._process_sequential_runs(runs) - - self._worker_threads = [BenchmarkThread(self, i) + exceptions = deque() # shared + self._worker_threads = [BenchmarkThread(self, i, exceptions) for i in range(self._num_worker_threads)] for thread in self._worker_threads: thread.start() - exceptions = [] - for thread in self._worker_threads: - thread.join() - if thread.exception is not None: - exceptions.append(thread.exception) + try: + #this is to work around python2.7 Thread.join + # https://stackoverflow.com/a/53560721/1197440 + threads = self._worker_threads[:] + while threads: + threads[-1].join(1) # allow KeyboardInterrup once per second + if not threads[-1].is_alive(): # reap dead Threads + threads.pop() + except KeyboardInterrupt as intr: + self._ui.debug_error_info("Waiting for cleanup\n") + for thread in self._worker_threads: + thread.tell_stop() + terminate_processes() + raise if exceptions: if len(exceptions) == 1: raise exceptions[0] - raise BenchmarkThreadExceptions(exceptions) + raise BenchmarkThreadExceptions(list(exceptions)) def _determine_num_work_items_to_take(self): # use a simple and naive scheduling strategy that still allows for From 44fe332540a645273ab0d81da0c68df55a2ad09d Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Tue, 1 Oct 2019 16:33:10 +0200 Subject: [PATCH 5/9] Isolate persistence writing to file to one thread There were out-of-order writes to the persitence file when there were multiple threads. This confines file-writing to a dedicated thread that works a Queue. --- rebench/model/run_id.py | 79 +++++++++++++++++++++++++++++++++++++++++ rebench/persistence.py | 2 +- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/rebench/model/run_id.py b/rebench/model/run_id.py index b4930492..eab01b0b 100644 --- a/rebench/model/run_id.py +++ b/rebench/model/run_id.py @@ -18,6 +18,12 @@ # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. import re +import sys +from threading import Thread +try: + from Queue import Queue, Empty +except ImportError: + from queue import Queue, Empty from .benchmark import Benchmark from .termination_check import TerminationCheck @@ -317,6 +323,13 @@ def from_str_list(cls, data_store, str_list): return data_store.create_run_id( benchmark, str_list[-3], str_list[-2], str_list[-1]) + @staticmethod + def make(benchmark, cores, input_size, var_value): + if _should_parallelize(benchmark): + return _RunIdParallel(benchmark, cores, input_size, var_value) + return RunId(benchmark, cores, input_size, var_value) + + def __str__(self): return "RunId(%s, %s, %s, %s, %s, %d)" % (self._benchmark.name, self._cores, @@ -324,3 +337,69 @@ def __str__(self): self._input_size or '', self._var_value or '', self._benchmark.run_details.warmup or 0) + + + +def _should_parallelize(bench_cfg): + # memoize cpu count, detemination is potentially expensive + if not hasattr(_should_parallelize, 'cpus'): + from multiprocessing import cpu_count + _should_parallelize.cpus = cpu_count() + return (not bench_cfg.run_details.execute_exclusively and + _should_parallelize.cpus > 1) + +class _RunIdParallel(RunId): + """ + Like RunId, but add_data_point queues the actual + act of adding the point to the persistance. + This is not really possible on persistence side, since + we do not know there whether we should parallelize. + + Using the queue avoids multiple threads accessing the same + file. While there are locks, things get messy and out of order + nonetheless. + """ + common_persist_queue = Queue() + consumer = None + + def __init__(self, benchmark, cores, input_size, var_value): + super(_RunIdParallel, self).__init__(benchmark, cores, input_size, var_value) + self._persist_queue, self._consumer = type(self)._ensure_running() + + def add_data_point(self, data_point, warmup): + self._persist_queue.put((self, data_point, warmup)) + + @classmethod + def _ensure_running(cls): + if cls.consumer is None: + cls.consumer = Thread(target=cls.process_data_points, + name="DataPoint persister") + cls.consumer.daemon = True + cls._register_shutdown() + cls.consumer.start() + return cls.common_persist_queue, cls.consumer + + @classmethod + def _ensure_stopped(cls): + if cls.consumer and cls.consumer.is_alive(): + cls.common_persist_queue.put(None) # stop marker + cls.consumer.join() + cls.consumer = None + + @classmethod + def _register_shutdown(cls): + import atexit, signal + atexit.register(cls._ensure_stopped) + + @classmethod + def process_data_points(cls): + q = cls.common_persist_queue + while True: + item = q.get() + if item is None: # end marker + q.task_done() # drain + break + self, data_point, warmup = item + RunId.add_data_point(self, data_point, warmup) + q.task_done() + diff --git a/rebench/persistence.py b/rebench/persistence.py index 06a64d7b..05b2b975 100644 --- a/rebench/persistence.py +++ b/rebench/persistence.py @@ -58,7 +58,7 @@ def create_run_id(self, benchmark, cores, input_size, var_value): if var_value == '': var_value = None - run = RunId(benchmark, cores, input_size, var_value) + run = RunId.make(benchmark, cores, input_size, var_value) if run in self._run_ids: return self._run_ids[run] else: From cc6b5f2b10e1926699ba5e6014f8c0e112d66277 Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Thu, 3 Oct 2019 10:34:46 +0200 Subject: [PATCH 6/9] Revert changes to executor as to not interfere with set expectations --- rebench/executor.py | 56 +++++++++++++++------------------------------ 1 file changed, 19 insertions(+), 37 deletions(-) diff --git a/rebench/executor.py b/rebench/executor.py index 114d303a..33d678a0 100644 --- a/rebench/executor.py +++ b/rebench/executor.py @@ -117,7 +117,7 @@ def _filter_out_completed_runs(runs, ui): def number_of_uncompleted_runs(runs, ui): return len(RunScheduler._filter_out_completed_runs(runs, ui)) - def _process_remaining_runs(self, runs, stop_signalled=None): + def _process_remaining_runs(self, runs): """Abstract, to be implemented""" def _estimate_time_left(self): @@ -165,27 +165,23 @@ def execute(self): class BatchScheduler(RunScheduler): - def _process_remaining_runs(self, runs, stop_signalled=None): + def _process_remaining_runs(self, runs): for run_id in runs: try: completed = False - while not completed and \ - (stop_signalled is None or not stop_signalled.is_set()): + while not completed: completed = self._executor.execute_run(run_id) self._indicate_progress(completed, run_id) except FailedBuilding: pass - if stop_signalled is not None and stop_signalled.is_set(): - break class RoundRobinScheduler(RunScheduler): - def _process_remaining_runs(self, runs, stop_signalled=None): + def _process_remaining_runs(self, runs): task_list = deque(runs) - while task_list and \ - (stop_signalled is None or not stop_signalled.is_set()): + while task_list: try: run = task_list.popleft() completed = self._executor.execute_run(run) @@ -201,8 +197,7 @@ class RandomScheduler(RunScheduler): def _process_remaining_runs(self, runs): task_list = list(runs) - while task_list and \ - (stop_signalled is None or not stop_signalled.is_set()): + while task_list: run = random.choice(task_list) try: completed = self._executor.execute_run(run) @@ -215,27 +210,23 @@ def _process_remaining_runs(self, runs): class BenchmarkThread(Thread): - def __init__(self, par_scheduler, num, exceptions): + def __init__(self, par_scheduler, num): Thread.__init__(self, name="BenchmarkThread %d" % num) self._par_scheduler = par_scheduler self._id = num - self.exceptions = exceptions - self.should_stop = Event() - - def tell_stop(self): - self.should_stop.set() + self.exception = None def run(self): try: scheduler = self._par_scheduler.get_local_scheduler() - while not self.should_stop.is_set(): + while True: work = self._par_scheduler.acquire_work() if work is None: return - scheduler._process_remaining_runs(work, stop_signalled=self.should_stop) + scheduler._process_remaining_runs(work) except BaseException as exp: - self.exceptions.append(exp) + self.exception = exp class BenchmarkThreadExceptions(Exception): @@ -281,32 +272,23 @@ def _process_sequential_runs(self, runs): def _process_remaining_runs(self, runs): self._remaining_work = self._process_sequential_runs(runs) - exceptions = deque() # shared - self._worker_threads = [BenchmarkThread(self, i, exceptions) + + self._worker_threads = [BenchmarkThread(self, i) for i in range(self._num_worker_threads)] for thread in self._worker_threads: thread.start() - try: - #this is to work around python2.7 Thread.join - # https://stackoverflow.com/a/53560721/1197440 - threads = self._worker_threads[:] - while threads: - threads[-1].join(1) # allow KeyboardInterrup once per second - if not threads[-1].is_alive(): # reap dead Threads - threads.pop() - except KeyboardInterrupt as intr: - self._ui.debug_error_info("Waiting for cleanup\n") - for thread in self._worker_threads: - thread.tell_stop() - terminate_processes() - raise + exceptions = [] + for thread in self._worker_threads: + thread.join() + if thread.exception is not None: + exceptions.append(thread.exception) if exceptions: if len(exceptions) == 1: raise exceptions[0] - raise BenchmarkThreadExceptions(list(exceptions)) + raise BenchmarkThreadExceptions(exceptions) def _determine_num_work_items_to_take(self): # use a simple and naive scheduling strategy that still allows for From f3012f136cf2d354979dc183fc220fcafeb1257b Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Mon, 7 Oct 2019 13:38:08 +0200 Subject: [PATCH 7/9] make sure we have a thread --- rebench/executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rebench/executor.py b/rebench/executor.py index 33d678a0..5a42ef4f 100644 --- a/rebench/executor.py +++ b/rebench/executor.py @@ -249,7 +249,8 @@ def __init__(self, executor, seq_scheduler_class, ui): def _number_of_threads(self): # TODO: read the configuration elements! non_interference_factor = float(2.5) - return int(floor(cpu_count() / non_interference_factor)) + # make sure we have at least one thread + return max(1, int(floor(cpu_count() / non_interference_factor))) @staticmethod def _split_runs(runs): From 92e9701e3393ead69df3e76b2afa26d7157caf7f Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Mon, 7 Oct 2019 14:20:38 +0200 Subject: [PATCH 8/9] Add queue-based pulling parallel scheduler The existing parallel scheduler behaves as is (With common behavior lifted to a new base clase). A new parallel scheulder maintains a work queue and the workers retrieve directly from there. a distinction is made for py2 vs py3, to make use of the built-in thread pool executor when possible. No new magic so far --- docs/usage.md | 2 +- rebench/configurator.py | 9 ++ rebench/executor.py | 180 ++++++++++++++++++++++++------ rebench/model/run_id.py | 14 +-- rebench/persistence.py | 7 +- rebench/rebench.py | 7 +- rebench/tests/executor_test.py | 18 ++- rebench/tests/small-for-pull.conf | 43 +++++++ 8 files changed, 223 insertions(+), 57 deletions(-) create mode 100644 rebench/tests/small-for-pull.conf diff --git a/docs/usage.md b/docs/usage.md index cdab23bf..99bece1c 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -157,7 +157,7 @@ For this purpose we use *schedulers* to determine the execution order. ```text -s SCHEDULER, --scheduler=SCHEDULER execution order of benchmarks: batch, round-robin, - random [default: batch] + random, pulling [default: batch] ``` #### Prevent Execution to Verify Configuration diff --git a/rebench/configurator.py b/rebench/configurator.py index 69444038..ee7417d8 100644 --- a/rebench/configurator.py +++ b/rebench/configurator.py @@ -106,6 +106,15 @@ def can_set_niceness(): else: return True +def cpu_count(): + # memoize cpu count, detemination is potentially expensive + if not hasattr(cpu_count, 'count'): + import multiprocessing + cpu_count.count = multiprocessing.cpu_count() + return cpu_count.count + +def can_parallelize(): + return cpu_count() > 1 def load_config(file_name): """ diff --git a/rebench/executor.py b/rebench/executor.py index 5a42ef4f..8088edc7 100644 --- a/rebench/executor.py +++ b/rebench/executor.py @@ -22,10 +22,9 @@ from codecs import open as open_with_enc from collections import deque from math import floor -from multiprocessing import cpu_count from threading import Thread, RLock, Event -from time import time +import time import os import pkgutil import random @@ -33,6 +32,10 @@ import errno import subprocess32 as subprocess +from .interop.adapter import ExecutionDeliveredNoResults +from .ui import escape_braces +from .configurator import cpu_count, can_parallelize + def make_subprocess_runner(): # subprocess.run is superb, but we do not have access @@ -88,8 +91,6 @@ def _maybe_decode(what): return coerce_string(what.decode('utf-8')) return what -from .interop.adapter import ExecutionDeliveredNoResults -from .ui import escape_braces class FailedBuilding(Exception): @@ -106,7 +107,7 @@ def __init__(self, executor, ui): self._executor = executor self._ui = ui self._runs_completed = 0 - self._start_time = time() + self._start_time = time.time() self._total_num_runs = 0 @staticmethod @@ -118,13 +119,13 @@ def number_of_uncompleted_runs(runs, ui): return len(RunScheduler._filter_out_completed_runs(runs, ui)) def _process_remaining_runs(self, runs): - """Abstract, to be implemented""" + raise NotImplementedError("abstract base class") def _estimate_time_left(self): if self._runs_completed == 0: return 0, 0, 0 - current = time() + current = time.time() time_per_invocation = ((current - self._start_time) / self._runs_completed) etl = time_per_invocation * (self._total_num_runs - self._runs_completed) sec = etl % 60 @@ -162,6 +163,13 @@ def execute(self): self._ui.step_spinner(completed_runs) self._process_remaining_runs(runs) + @classmethod + def as_sequential(cls, executor, ui): + return cls(executor, ui) + + @classmethod + def as_parallel(cls, executor, ui): + return ParallelScheduler(executor, cls, ui) class BatchScheduler(RunScheduler): @@ -235,25 +243,20 @@ def __init__(self, exceptions): super(BenchmarkThreadExceptions, self).__init__() self.exceptions = exceptions +class BaseParallelScheduler(RunScheduler): + def __init__(self, executor, ui): + super(BaseParallelScheduler, self).__init__(executor, ui) + self._worker_count = self._number_of_threads() -class ParallelScheduler(RunScheduler): - - def __init__(self, executor, seq_scheduler_class, ui): - RunScheduler.__init__(self, executor, ui) - self._seq_scheduler_class = seq_scheduler_class - self._lock = RLock() - self._num_worker_threads = self._number_of_threads() - self._remaining_work = None - self._worker_threads = None - - def _number_of_threads(self): + @classmethod + def _number_of_threads(cls): # TODO: read the configuration elements! non_interference_factor = float(2.5) # make sure we have at least one thread return max(1, int(floor(cpu_count() / non_interference_factor))) - @staticmethod - def _split_runs(runs): + @classmethod + def _split_runs(cls, runs): seq_runs = [] par_runs = [] for run in runs: @@ -263,19 +266,36 @@ def _split_runs(runs): par_runs.append(run) return seq_runs, par_runs - def _process_sequential_runs(self, runs): + def _process_remaining_runs(self, runs): seq_runs, par_runs = self._split_runs(runs) + self._process_sequential_runs(seq_runs) + self._process_parallel_runs(par_runs) + + def _process_sequential_runs(self, seq_runs): + raise NotImplementedError("abstract base class") + def _process_parallel_runs(self, seq_runs): + raise NotImplementedError("abstract base class") + + +class ParallelScheduler(BaseParallelScheduler): + + def __init__(self, executor, seq_scheduler_class, ui): + super(ParallelScheduler, self).__init__(executor, ui) + self._seq_scheduler_class = seq_scheduler_class + self._lock = RLock() + self._remaining_work = None + self._worker_threads = None + + def _process_sequential_runs(self, seq_runs): scheduler = self._seq_scheduler_class(self._executor, self._ui) scheduler._process_remaining_runs(seq_runs) - return par_runs - - def _process_remaining_runs(self, runs): - self._remaining_work = self._process_sequential_runs(runs) + def _process_parallel_runs(self, par_runs): + self._remaining_work = self.par_runs self._worker_threads = [BenchmarkThread(self, i) - for i in range(self._num_worker_threads)] + for i in range(self._worker_count)] for thread in self._worker_threads: thread.start() @@ -295,7 +315,7 @@ def _determine_num_work_items_to_take(self): # use a simple and naive scheduling strategy that still allows for # different running times, without causing too much scheduling overhead k = len(self._remaining_work) - per_thread = int(floor(float(k) / float(self._num_worker_threads))) + per_thread = int(floor(float(k) / float(self._worker_count))) per_thread = max(1, per_thread) # take at least 1 run return per_thread @@ -314,6 +334,99 @@ def acquire_work(self): work.append(self._remaining_work.pop()) return work +class PullingScheduler(BatchScheduler): + # same as superclass, but different parallel counterpart + @classmethod + def as_parallel(cls, executor, ui): + return PullingParallelScheduler(executor, ui) + +class PullingParallelScheduler(BaseParallelScheduler): + + def __init__(self, executor, ui): + super(PullingParallelScheduler, self).__init__(executor, ui) + self._work = self._new_queue() + self._should_stop = Event() + + def _new_queue(self): + try: + import Queue as queue + except ImportError: + import queue + return queue.Queue() + + def _process_sequential_runs(self, seq_runs): + scheduler = self.as_sequential(self._executor, self._ui) + scheduler._process_remaining_runs(seq_runs) + + def _execute_one_run(self, run_id): + completed = False + try: + completed = self._executor.execute_run(run_id) + self._indicate_progress(completed, run_id) + except FailedBuilding: + pass + return completed + + def _pull_and_execute(self, exceptions): + while not self._should_stop.is_set(): + completed = False + run_id = self._work.get() + try: + if run_id is None: + return + completed = self._execute_one_run(run_id) + if not completed: + self._work.put(run_id) + except BaseException as exp: + exceptions.append(exp) + finally: + self._work.task_done() + + + def _process_parallel_runs(self, par_runs): + for run in par_runs: + self._work.put(run) + exceptions = [] + self._process_work(exceptions) + if exceptions: + raise (exceptions[0] if len(exceptions) == 1 \ + else BenchmarkThreadExceptions(exceptions)) + + def _2_process_work(self, exceptions): + workers = [Thread(target=self._pull_and_execute, args=(exceptions,)) + for _ in range(self._worker_count)] + for worker in workers: + worker.daemon = True + worker.start() + try: + self._work.join() + except KeyboardInterrupt: + self._should_stop.set() + raise + for _ in range(self._worker_count): self._work.put(None) + + + + def _3_process_work(self, exceptions): + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=self._worker_count) as executor: + for _ in range(self._worker_count): + executor.submit(self._pull_and_execute, exceptions) + try: + self._work.join() + except KeyboardInterrupt: + self._should_stop.set() + raise + for _ in range(self._worker_count): self._work.put(None) + + if sys.version_info < (3, 3): + _process_work = _2_process_work + else: + _process_work = _3_process_work + + @classmethod + def as_sequential(cls, executor, ui): + return PullingScheduler(executor, ui) class Executor(object): @@ -334,15 +447,10 @@ def __init__(self, runs, use_nice, do_builds, ui, include_faulty=False, def _create_scheduler(self, scheduler): # figure out whether to use parallel scheduler - if cpu_count() > 1: - i = 0 - for run in self._runs: - if not run.execute_exclusively: - i += 1 - if i > 1: - return ParallelScheduler(self, scheduler, self._ui) - - return scheduler(self, self._ui) + if (can_parallelize() and + any(not run.execute_exclusively for run in self._runs)): + return scheduler.as_parallel(self, self._ui) + return scheduler.as_sequential(self, self._ui) def _construct_cmdline(self, run_id, gauge_adapter): cmdline = "" diff --git a/rebench/model/run_id.py b/rebench/model/run_id.py index eab01b0b..5509bc98 100644 --- a/rebench/model/run_id.py +++ b/rebench/model/run_id.py @@ -29,7 +29,7 @@ from .termination_check import TerminationCheck from ..statistics import StatisticProperties from ..ui import UIError - +from ..configurator import can_parallelize class RunId(object): @@ -325,7 +325,7 @@ def from_str_list(cls, data_store, str_list): @staticmethod def make(benchmark, cores, input_size, var_value): - if _should_parallelize(benchmark): + if not benchmark.run_details.execute_exclusively and can_parallelize(): return _RunIdParallel(benchmark, cores, input_size, var_value) return RunId(benchmark, cores, input_size, var_value) @@ -338,16 +338,6 @@ def __str__(self): self._var_value or '', self._benchmark.run_details.warmup or 0) - - -def _should_parallelize(bench_cfg): - # memoize cpu count, detemination is potentially expensive - if not hasattr(_should_parallelize, 'cpus'): - from multiprocessing import cpu_count - _should_parallelize.cpus = cpu_count() - return (not bench_cfg.run_details.execute_exclusively and - _should_parallelize.cpus > 1) - class _RunIdParallel(RunId): """ Like RunId, but add_data_point queues the actual diff --git a/rebench/persistence.py b/rebench/persistence.py index 05b2b975..8fc02f21 100644 --- a/rebench/persistence.py +++ b/rebench/persistence.py @@ -230,6 +230,7 @@ def _open_file_to_add_new_data(self): self._file = open(self._data_filename, 'a+') def close(self): - if self._file: - self._file.close() - self._file = None + with self._lock: + if self._file: + self._file.close() + self._file = None diff --git a/rebench/rebench.py b/rebench/rebench.py index 8438f826..539f79bc 100755 --- a/rebench/rebench.py +++ b/rebench/rebench.py @@ -33,7 +33,7 @@ from . import __version__ as rebench_version from .executor import Executor, BatchScheduler, RoundRobinScheduler, \ - RandomScheduler + RandomScheduler, PullingScheduler from .persistence import DataStore from .reporter import CliReporter from .configurator import Configurator, load_config @@ -121,7 +121,7 @@ def shell_options(self): '-s', '--scheduler', action='store', dest='scheduler', default='batch', help='execution order of benchmarks: ' - 'batch, round-robin, random [default: %(default)s]') + 'batch, round-robin, random, pulling [default: %(default)s]') execution.add_argument( '-E', '--no-execution', action='store_true', dest='no_execution', default=False, @@ -223,7 +223,8 @@ def execute_experiment(self, runs): scheduler_class = {'batch': BatchScheduler, 'round-robin': RoundRobinScheduler, - 'random': RandomScheduler}.get(self._config.options.scheduler) + 'random': RandomScheduler, + 'pulling': PulingScheduler,}.get(self._config.options.scheduler) executor = Executor(runs, self._config.use_nice, self._config.do_builds, self._ui, diff --git a/rebench/tests/executor_test.py b/rebench/tests/executor_test.py index e6e410b4..ccb10045 100644 --- a/rebench/tests/executor_test.py +++ b/rebench/tests/executor_test.py @@ -78,7 +78,7 @@ def test_broken_command_format_with_TypeError(self): ex.execute() self.assertIsInstance(err.exception.source_exception, TypeError) - def _basic_execution(self, cnf): + def _basic_execution(self, cnf, *args, **kwargs): runs = cnf.get_runs() self.assertEqual(8, len(runs)) @@ -87,7 +87,7 @@ def _basic_execution(self, cnf): for run in runs: run.add_persistence(persistence) - ex = Executor(runs, cnf.use_nice, cnf.do_builds, TestDummyUI()) + ex = Executor(runs, cnf.use_nice, cnf.do_builds, TestDummyUI(), *args, **kwargs) ex.execute() for run in runs: data_points = persistence.get_data_points(run) @@ -147,6 +147,20 @@ def test_execution_with_invocation_and_iteration_set(self): self.assertEqual(2, run.get_number_of_data_points()) + def test_basic_execution_with_pulling_sequential(self): + from ..executor import PullingScheduler + cnf = Configurator(load_config(self._path + '/small-for-pull.conf'), + DataStore(self._ui), self._ui, None, + exp_name='TestSeq', data_file=self._tmp_file) + self._basic_execution(cnf, scheduler=PullingScheduler) + + def test_basic_execution_with_pulling_parallel(self): + from ..executor import PullingScheduler + cnf = Configurator(load_config(self._path + '/small-for-pull.conf'), + DataStore(self._ui), self._ui, None, + exp_name='TestPar', data_file=self._tmp_file) + self._basic_execution(cnf, scheduler=PullingScheduler) + def test_shell_options_without_filters(self): option_parser = ReBench().shell_options() args = option_parser.parse_args(['-d', '-v', 'some.conf']) diff --git a/rebench/tests/small-for-pull.conf b/rebench/tests/small-for-pull.conf new file mode 100644 index 00000000..d181d58b --- /dev/null +++ b/rebench/tests/small-for-pull.conf @@ -0,0 +1,43 @@ +# Config file for ReBench +# Config format is YAML (see http://yaml.org/ for detailed spec) + +# this run definition will be chosen if no parameters are given to rebench.py +default_experiment: TestSeq +default_data_file: 'tests/small-pull.data' + +# general configuration for runs +runs: + invocations: 10 + retries_after_failure: 3 + +benchmark_suites: + Suite: + gauge_adapter: TestExecutor + command: TestBenchMarks %(benchmark)s %(input)s %(variable)s + input_sizes: [2, 4, 8, 10] + variable_values: + - val1 + benchmarks: + - Bench1 + - Bench2 + +executors: + TestRunnerSeq: + path: tests + executable: test-vm2.py + TestRunnerPar: + path: tests + executable: test-vm2.py + execute_exclusively: false + +experiments: + TestSeq: + suites: + - Suite + executions: + - TestRunnerSeq + TestPar: + suites: + - Suite + executions: + - TestRunnerPar From d404b8efb9e3e66e4bc2033771ec27cc939e06eb Mon Sep 17 00:00:00 2001 From: Tobias Pape Date: Mon, 7 Oct 2019 15:57:40 +0200 Subject: [PATCH 9/9] fix typo --- rebench/rebench.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebench/rebench.py b/rebench/rebench.py index 539f79bc..fc1d61a7 100755 --- a/rebench/rebench.py +++ b/rebench/rebench.py @@ -224,7 +224,7 @@ def execute_experiment(self, runs): scheduler_class = {'batch': BatchScheduler, 'round-robin': RoundRobinScheduler, 'random': RandomScheduler, - 'pulling': PulingScheduler,}.get(self._config.options.scheduler) + 'pulling': PullingScheduler,}.get(self._config.options.scheduler) executor = Executor(runs, self._config.use_nice, self._config.do_builds, self._ui,