Skip to content

Commit

Permalink
Every three minutes validate a random run from the cache.
Browse files Browse the repository at this point in the history
The reason for this PR is that before introducing more dynamic
updates we would like to verify that the dynamic updates of
run["workers"] and run["cores"] introduced in
official-stockfish#2010
are bug free.

In order to facilitate the use of periodic timers in Fishtest
we have introduced a new scheduler in utils.py which may be
interesting in its own right.

Currently the timers defined are the cache flush timer and the
timer introduced in this PR.
  • Loading branch information
vdbergh committed May 23, 2024
1 parent 7be6f51 commit 3fdc3e9
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 23 deletions.
2 changes: 1 addition & 1 deletion server/fishtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def init_rundb(event):
if rundb.is_primary_instance():
signal.signal(signal.SIGINT, rundb.exit_run)
signal.signal(signal.SIGTERM, rundb.exit_run)
rundb.start_timer()
rundb.schedule_tasks()
rundb.update_workers_cores()

config.add_subscriber(add_rundb, NewRequest)
Expand Down
66 changes: 47 additions & 19 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
from bson.errors import InvalidId
from bson.objectid import ObjectId
from fishtest.actiondb import ActionDb
from fishtest.schemas import RUN_VERSION, nn_schema, pgns_schema, runs_schema
from fishtest.schemas import (
RUN_VERSION,
nn_schema,
pgns_schema,
runs_schema,
valid_aggregated_data,
)
from fishtest.stats.stat_util import SPRT_elo
from fishtest.userdb import UserDb
from fishtest.util import (
GeneratorAsFileReader,
Scheduler,
crash_or_time,
estimate_game_duration,
format_bounds,
Expand Down Expand Up @@ -73,8 +80,43 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):
self.__is_primary_instance = is_primary_instance

self.request_task_lock = threading.Lock()
self.timer = None
self.timer_active = True
self.scheduler = None

def schedule_tasks(self):
if self.scheduler is None:
self.scheduler = Scheduler(jitter=0.05)
self.scheduler.add_task(1.0, self.flush_buffers)
self.scheduler.add_task(180.0, self.validate_random_run)

def validate_random_run(self):
run_list = list(self.run_cache.values())
if len(run_list) == 0:
print(
"Validate_random_run: cache empty. No runs to validate...", flush=True
)
return
run = random.choice(list(run_list))["run"]
run_id = str(run["_id"])
try:
# Make sure that the run object does not change while we are
# validating it
with self.active_run_lock(run_id):
# We verify only the aggregated data since the other
# data is not synchronized and may be in a transient
# inconsistent state
validate(valid_aggregated_data, run, "run")
print(
f"Validate_random_run: validated aggregated data in cache run {run_id}...",
flush=True,
)
except ValidationError as e:
message = f"The run object {run_id} does not validate: {str(e)}"
print(message, flush=True)
if "version" in run and run["version"] >= RUN_VERSION:
self.actiondb.log_message(
username="fishtest.system",
message=message,
)

def set_inactive_run(self, run):
run_id = run["_id"]
Expand Down Expand Up @@ -348,7 +390,8 @@ def get_nns(self, user="", network_name="", master_only=False, limit=0, skip=0):

# handle termination
def exit_run(self, signum, frame):
self.stop_timer()
if self.scheduler is not None:
self.scheduler.stop()
self.flush_all()
if self.port >= 0:
self.actiondb.system_event(message=f"stop fishtest@{self.port}")
Expand Down Expand Up @@ -378,17 +421,6 @@ def get_run(self, r_id):
else:
return self.runs.find_one({"_id": r_id_obj})

def stop_timer(self):
if self.timer is not None:
self.timer.cancel()
self.timer = None
self.timer_active = False

def start_timer(self):
if self.timer_active:
self.timer = threading.Timer(1.0, self.flush_buffers)
self.timer.start()

def buffer(self, run, flush):
if not self.is_primary_instance():
print(
Expand Down Expand Up @@ -438,8 +470,6 @@ def flush_all(self):
print("done", flush=True)

def flush_buffers(self):
if self.timer is None:
return
try:
self.run_cache_lock.acquire()
now = time.time()
Expand Down Expand Up @@ -476,9 +506,7 @@ def flush_buffers(self):
except:
print("Flush exception", flush=True)
finally:
# Restart timer:
self.run_cache_lock.release()
self.start_timer()

def scavenge(self, run):
if datetime.now(timezone.utc) < boot_time + timedelta(seconds=300):
Expand Down
40 changes: 37 additions & 3 deletions server/fishtest/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,43 @@ def final_results_must_match(run):
raise Exception(
f"The final results {run['results']} do not match the computed results {rr}"
)
else:
return True

return True


def cores_must_match(run):
cores = 0
for t in run["tasks"]:
if t["active"]:
cores += t["worker_info"]["concurrency"]
if cores != run["cores"]:
raise Exception(
f"Cores mismatch. Cores from tasks: {cores}. Cores from "
f"run: {run['cores']}"
)

return True


def workers_must_match(run):
workers = 0
for t in run["tasks"]:
if t["active"]:
workers += 1
if workers != run["workers"]:
raise Exception(
f"Workers mismatch. Workers from tasks: {workers}. Workers from "
f"run: {run['workers']}"
)

return True


valid_aggregated_data = intersect(
final_results_must_match,
cores_must_match,
workers_must_match,
)

# The following schema only matches new runs. The old runs
# are not compatible with it. For documentation purposes
Expand Down Expand Up @@ -655,5 +689,5 @@ def final_results_must_match(run):
lax(ifthen({"deleted": True}, {"finished": True})),
lax(ifthen({"finished": True}, {"workers": 0, "cores": 0})),
lax(ifthen({"finished": True}, {"tasks": [{"active": False}, ...]})),
final_results_must_match,
valid_aggregated_data,
)
204 changes: 204 additions & 0 deletions server/fishtest/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import math
import re
import smtplib
import threading
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
from functools import cache
from random import uniform

import fishtest.stats.stat_util
import numpy
Expand Down Expand Up @@ -545,3 +547,205 @@ def strip_run(run):
stripped[key] = str(run[key])

return stripped


"""
The following scheduling code should be thread safe.
- First and foremost, all tasks are executed in a single main thread.
So they are atomic. In particular, during its lifetime, a task will be
executed exactly once at each scheduling point.
- The main thread maintains a list of scheduled tasks. To safely manipulate
this list outside the main thread we rely on the atomicity of in-place
list operations in Python.
- To signal the main thread that the task list has changed, which should
be acted upon as soon as possible as it might affect the next task to
be executed, we use a threading.Event.
Example
s=Scheduler()
s.add_task(3, task1)
s.add_task(2, task2)
When the second task is scheduled, the scheduler will interrupt the
3s wait for the first task and replace it by a 2s wait for the second task.
"""


class Task:
"""This is an opaque class representing a task. Instances should be created via
Scheduler.add_task().
"""

def __init__(
self,
period,
worker,
initial_delay=None,
one_shot=False,
jitter=0.0,
args=(),
kwargs={},
):
self.period = timedelta(seconds=period)
self.worker = worker
if initial_delay is None:
initial_delay = self.period
else:
initial_delay = timedelta(seconds=initial_delay)
self.__rel_jitter = jitter * self.period
self.__next_schedule = (
datetime.now(timezone.utc)
+ initial_delay
+ uniform(-self.__rel_jitter, self.__rel_jitter)
)
self.one_shot = one_shot
self.__expired = False
self.args = args
self.kwargs = kwargs

def do_work(self):
if self.__expired:
return
try:
self.worker(*self.args, *self.kwargs)
except Exception as e:
print(f"Exception while executing task: {str(e)}", flush=True)
if not self.one_shot:
self.__next_schedule += self.period + uniform(
-self.__rel_jitter, self.__rel_jitter
)
else:
self.__expired = True

def next_schedule(self):
return self.__next_schedule

def expired(self):
return self.__expired


class Scheduler:
"""This creates a scheduler
:param jitter: the default value for the task jitter (see below), defaults to 0.0
:type jitter: float, optional
:param auto_start: Start scheduling when the scheduler is created. For a small number
of tasks this is appropriate. When scheduling many tasks it is better to use
auto_start=False and then to start the scheduler manually with self.start(), defaults to True
:type auto_start: bool, optional
"""

def __init__(self, jitter=0.0, auto_start=True):
"""Constructor method"""
self.__tasks = []
self.__event = threading.Event()
self.__timer_stopped = True
self.jitter = jitter
# dummy task to avoid special casing an empty task list
self.add_task(3600.0, lambda: None, jitter=0.1)
self.__worker_thread = None
self.__start_lock = threading.Lock()
if auto_start:
self.start()

def add_task(
self,
period,
worker,
initial_delay=None,
one_shot=False,
jitter=None,
args=(),
kwargs={},
):
"""This schedules a new task.
:param period: The period after which the task will repeat
:type period: float
:param worker: A callable that executes the task
:type worker: Callable
:param initial_delay: The delay before the first execution of the task, defaults to period
:type initial_delay: float, optional
:param one_shot: If true, execute the task only once, defaults to False
:type one_shot: bool, optional
:param jitter: Add random element of [-jitter*period, jitter*period] to delays, defaults to self.jitter
:type jitter: float, optional
:param args: Arguments passed to the worker, defaults to ()
:type args: tuple, optional
:param kwargs: Keyword arguments passed to the worker, defaults to {}
:type kwargs: dict, optional
:rtype: Task
"""
if jitter is None:
jitter = self.jitter
task = Task(
period,
worker,
initial_delay=initial_delay,
one_shot=one_shot,
jitter=jitter,
args=args,
kwargs=kwargs,
)
self.__tasks.append(task)
self.__event.set()
return task

def del_task(self, task):
"""This deletes a task
:param task: The task to be deleted
:type task: Task
"""
self.__del_task(task)
self.__event.set()

def start(self):
"""This starts the scheduler if it was started with auto_start=False"""
with self.__start_lock:
if not self.__timer_stopped:
return
self.__timer_stopped = False
self.__worker_thread = threading.Thread(target=self.__next_schedule)
self.__worker_thread.start()

def stop(self):
"""This stops the scheduler"""
self.__timer_stopped = True
if self.__worker_thread is not None:
self.__event.set()
self.__worker_thread.join()

def __del_task(self, task):
try:
self.__tasks.remove(task)
except Exception:
pass

def __next_schedule(self):
while not self.__timer_stopped:
self.__event.clear()
next_schedule = None
for task in copy.copy(self.__tasks):
if task.expired():
self.__del_task(task)
else:
if next_schedule is None or task.next_schedule() < next_schedule:
next_task = task
next_schedule = task.next_schedule()
delay = (next_schedule - datetime.now(timezone.utc)).total_seconds()
self.__event.wait(delay)
if not self.__event.is_set():
next_task.do_work()

0 comments on commit 3fdc3e9

Please sign in to comment.