diff --git a/server/fishtest/__init__.py b/server/fishtest/__init__.py index 26d214034..80f5b9653 100644 --- a/server/fishtest/__init__.py +++ b/server/fishtest/__init__.py @@ -100,7 +100,7 @@ def init_rundb(event): if rundb.is_primary_instance(): signal.signal(signal.SIGINT, rundb.exit_run) signal.signal(signal.SIGTERM, rundb.exit_run) - rundb.start_timer() + rundb.schedule_tasks() rundb.update_workers_cores() config.add_subscriber(add_rundb, NewRequest) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 39c18493d..30ea5fee2 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -17,11 +17,18 @@ from bson.errors import InvalidId from bson.objectid import ObjectId from fishtest.actiondb import ActionDb -from fishtest.schemas import RUN_VERSION, nn_schema, pgns_schema, runs_schema +from fishtest.schemas import ( + RUN_VERSION, + nn_schema, + pgns_schema, + runs_schema, + valid_aggregated_data, +) from fishtest.stats.stat_util import SPRT_elo from fishtest.userdb import UserDb from fishtest.util import ( GeneratorAsFileReader, + Scheduler, crash_or_time, estimate_game_duration, format_bounds, @@ -73,8 +80,43 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): self.__is_primary_instance = is_primary_instance self.request_task_lock = threading.Lock() - self.timer = None - self.timer_active = True + self.scheduler = None + + def schedule_tasks(self): + if self.scheduler is None: + self.scheduler = Scheduler(jitter=0.05) + self.scheduler.add_task(1.0, self.flush_buffers) + self.scheduler.add_task(180.0, self.validate_random_run) + + def validate_random_run(self): + run_list = list(self.run_cache.values()) + if len(run_list) == 0: + print( + "Validate_random_run: cache empty. No runs to validate...", flush=True + ) + return + run = random.choice(list(run_list))["run"] + run_id = str(run["_id"]) + try: + # Make sure that the run object does not change while we are + # validating it + with self.active_run_lock(run_id): + # We verify only the aggregated data since the other + # data is not synchronized and may be in a transient + # inconsistent state + validate(valid_aggregated_data, run, "run") + print( + f"Validate_random_run: validated aggregated data in cache run {run_id}...", + flush=True, + ) + except ValidationError as e: + message = f"The run object {run_id} does not validate: {str(e)}" + print(message, flush=True) + if "version" in run and run["version"] >= RUN_VERSION: + self.actiondb.log_message( + username="fishtest.system", + message=message, + ) def set_inactive_run(self, run): run_id = run["_id"] @@ -348,7 +390,8 @@ def get_nns(self, user="", network_name="", master_only=False, limit=0, skip=0): # handle termination def exit_run(self, signum, frame): - self.stop_timer() + if self.scheduler is not None: + self.scheduler.stop() self.flush_all() if self.port >= 0: self.actiondb.system_event(message=f"stop fishtest@{self.port}") @@ -378,17 +421,6 @@ def get_run(self, r_id): else: return self.runs.find_one({"_id": r_id_obj}) - def stop_timer(self): - if self.timer is not None: - self.timer.cancel() - self.timer = None - self.timer_active = False - - def start_timer(self): - if self.timer_active: - self.timer = threading.Timer(1.0, self.flush_buffers) - self.timer.start() - def buffer(self, run, flush): if not self.is_primary_instance(): print( @@ -438,8 +470,6 @@ def flush_all(self): print("done", flush=True) def flush_buffers(self): - if self.timer is None: - return try: self.run_cache_lock.acquire() now = time.time() @@ -476,9 +506,7 @@ def flush_buffers(self): except: print("Flush exception", flush=True) finally: - # Restart timer: self.run_cache_lock.release() - self.start_timer() def scavenge(self, run): if datetime.now(timezone.utc) < boot_time + timedelta(seconds=300): diff --git a/server/fishtest/schemas.py b/server/fishtest/schemas.py index e3f175c4e..5a79aacad 100644 --- a/server/fishtest/schemas.py +++ b/server/fishtest/schemas.py @@ -486,9 +486,43 @@ def final_results_must_match(run): raise Exception( f"The final results {run['results']} do not match the computed results {rr}" ) - else: - return True + return True + + +def cores_must_match(run): + cores = 0 + for t in run["tasks"]: + if t["active"]: + cores += t["worker_info"]["concurrency"] + if cores != run["cores"]: + raise Exception( + f"Cores mismatch. Cores from tasks: {cores}. Cores from " + f"run: {run['cores']}" + ) + + return True + + +def workers_must_match(run): + workers = 0 + for t in run["tasks"]: + if t["active"]: + workers += 1 + if workers != run["workers"]: + raise Exception( + f"Workers mismatch. Workers from tasks: {workers}. Workers from " + f"run: {run['workers']}" + ) + + return True + + +valid_aggregated_data = intersect( + final_results_must_match, + cores_must_match, + workers_must_match, +) # The following schema only matches new runs. The old runs # are not compatible with it. For documentation purposes @@ -655,5 +689,5 @@ def final_results_must_match(run): lax(ifthen({"deleted": True}, {"finished": True})), lax(ifthen({"finished": True}, {"workers": 0, "cores": 0})), lax(ifthen({"finished": True}, {"tasks": [{"active": False}, ...]})), - final_results_must_match, + valid_aggregated_data, ) diff --git a/server/fishtest/util.py b/server/fishtest/util.py index 0d7bc33fe..c3eb60aba 100644 --- a/server/fishtest/util.py +++ b/server/fishtest/util.py @@ -2,10 +2,13 @@ import hashlib import math import re +import signal import smtplib +import threading from datetime import datetime, timedelta, timezone from email.mime.text import MIMEText from functools import cache +from random import uniform import fishtest.stats.stat_util import numpy @@ -545,3 +548,223 @@ def strip_run(run): stripped[key] = str(run[key]) return stripped + + +class blocked_signals: + def __init__(self, mask=None): + self.mask = signal.valid_signals() if mask is None else mask + + def __enter__(self): + self.old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, self.mask) + + def __exit__(self, exc_type, exc_val, exc_tb): + signal.pthread_sigmask(signal.SIG_SETMASK, self.old_mask) + + +""" +The following scheduling code should be thread safe. + +- First and foremost, all tasks are executed in a single main thread. +So they are atomic. In particular, during its lifetime, a task will be +executed exactly once at each scheduling point. + +- The main thread maintains a list of scheduled tasks. To safely manipulate +this list outside the main thread we rely on the atomicity of in-place +list operations in Python. + +- To signal the main thread that the task list has changed, which should +be acted upon as soon as possible as it might affect the next task to +be executed, we use a threading.Event. + +Example + +s=Scheduler() +s.add_task(3, task1) +s.add_task(2, task2) + +When the second task is scheduled, the scheduler will interrupt the +3s wait for the first task and replace it by a 2s wait for the second task. + +- Starting and stopping the scheduler is synchronized via a lock. Since these +methods could be called in a signal handlers, we also block signals. +""" + + +class Task: + """This is an opaque class representing a task. Instances should be created via + Scheduler.add_task(). + """ + + def __init__( + self, + period, + worker, + initial_delay=None, + one_shot=False, + jitter=0.0, + args=(), + kwargs={}, + ): + self.period = timedelta(seconds=period) + self.worker = worker + if initial_delay is None: + initial_delay = self.period + else: + initial_delay = timedelta(seconds=initial_delay) + self.__rel_jitter = jitter * self.period + self.__next_schedule = ( + datetime.now(timezone.utc) + + initial_delay + + uniform(-self.__rel_jitter, self.__rel_jitter) + ) + self.one_shot = one_shot + self.__expired = False + self.args = args + self.kwargs = kwargs + + def do_work(self): + if self.__expired: + return + try: + self.worker(*self.args, *self.kwargs) + except Exception as e: + print(f"Exception while executing task: {str(e)}", flush=True) + if not self.one_shot: + self.__next_schedule += self.period + uniform( + -self.__rel_jitter, self.__rel_jitter + ) + else: + self.__expired = True + + def next_schedule(self): + return self.__next_schedule + + def expired(self): + return self.__expired + + +class Scheduler: + """This creates a scheduler + + :param jitter: the default value for the task jitter (see below), defaults to 0.0 + :type jitter: float, optional + + :param auto_start: Start scheduling when the scheduler is created. For a small number + of tasks this is appropriate. When scheduling many tasks it is better to use + auto_start=False and then to start the scheduler manually with self.start(), defaults to True + :type auto_start: bool, optional + """ + + def __init__(self, jitter=0.0, auto_start=True): + """Constructor method""" + self.__tasks = [] + self.__event = threading.Event() + self.__thread_stopped = True + self.jitter = jitter + self.__worker_thread = None + self.__stop_start_lock = threading.Lock() + if auto_start: + self.start() + + def add_task( + self, + period, + worker, + initial_delay=None, + one_shot=False, + jitter=None, + args=(), + kwargs={}, + ): + """This schedules a new task. + + :param period: The period after which the task will repeat + :type period: float + + :param worker: A callable that executes the task + :type worker: Callable + + :param initial_delay: The delay before the first execution of the task, defaults to period + :type initial_delay: float, optional + + :param one_shot: If true, execute the task only once, defaults to False + :type one_shot: bool, optional + + :param jitter: Add random element of [-jitter*period, jitter*period] to delays, defaults to self.jitter + :type jitter: float, optional + + :param args: Arguments passed to the worker, defaults to () + :type args: tuple, optional + + :param kwargs: Keyword arguments passed to the worker, defaults to {} + :type kwargs: dict, optional + + :rtype: Task + """ + if jitter is None: + jitter = self.jitter + task = Task( + period, + worker, + initial_delay=initial_delay, + one_shot=one_shot, + jitter=jitter, + args=args, + kwargs=kwargs, + ) + self.__tasks.append(task) + self.__event.set() + return task + + def del_task(self, task): + """This deletes a task + + :param task: The task to be deleted + :type task: Task + """ + self.__del_task(task) + self.__event.set() + + def start(self): + """This starts the scheduler if it was started with auto_start=False""" + with blocked_signals(): + with self.__stop_start_lock: + if not self.__thread_stopped: + return + self.__thread_stopped = False + self.__worker_thread = threading.Thread(target=self.__next_schedule) + self.__worker_thread.start() + + def stop(self): + """This stops the scheduler""" + with blocked_signals(): + with self.__stop_start_lock: + self.__thread_stopped = True + if self.__worker_thread is not None: + self.__event.set() + self.__worker_thread.join() + + def __del_task(self, task): + try: + self.__tasks.remove(task) + except Exception: + pass + + def __next_schedule(self): + while not self.__thread_stopped: + self.__event.clear() + next_schedule = None + for task in copy.copy(self.__tasks): + if task.expired(): + self.__del_task(task) + else: + if next_schedule is None or task.next_schedule() < next_schedule: + next_task = task + next_schedule = task.next_schedule() + if next_schedule is not None: + delay = (next_schedule - datetime.now(timezone.utc)).total_seconds() + self.__event.wait(delay) + if not self.__event.is_set(): + next_task.do_work() + else: + self.__event.wait()