Skip to content

Commit

Permalink
Every three minutes validate a random run from the cache.
Browse files Browse the repository at this point in the history
The reason for this PR is that before introducing more dynamic
updates we would like to verify that the dynamic updates of
run["workers"] and run["cores"] introduced in
official-stockfish#2010
are bug free.

In order to facilitate the use of periodic timers in Fishtest
we have introduced a new scheduler in utils.py which may be
interesting in its own right.

Currently the timers defined are the cache flush timer and the
timer introduced in this PR.
  • Loading branch information
vdbergh committed May 24, 2024
1 parent 7be6f51 commit 5c78bdc
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 23 deletions.
2 changes: 1 addition & 1 deletion server/fishtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def init_rundb(event):
if rundb.is_primary_instance():
signal.signal(signal.SIGINT, rundb.exit_run)
signal.signal(signal.SIGTERM, rundb.exit_run)
rundb.start_timer()
rundb.schedule_tasks()
rundb.update_workers_cores()

config.add_subscriber(add_rundb, NewRequest)
Expand Down
66 changes: 47 additions & 19 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
from bson.errors import InvalidId
from bson.objectid import ObjectId
from fishtest.actiondb import ActionDb
from fishtest.schemas import RUN_VERSION, nn_schema, pgns_schema, runs_schema
from fishtest.schemas import (
RUN_VERSION,
nn_schema,
pgns_schema,
runs_schema,
valid_aggregated_data,
)
from fishtest.stats.stat_util import SPRT_elo
from fishtest.userdb import UserDb
from fishtest.util import (
GeneratorAsFileReader,
Scheduler,
crash_or_time,
estimate_game_duration,
format_bounds,
Expand Down Expand Up @@ -73,8 +80,43 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):
self.__is_primary_instance = is_primary_instance

self.request_task_lock = threading.Lock()
self.timer = None
self.timer_active = True
self.scheduler = None

def schedule_tasks(self):
if self.scheduler is None:
self.scheduler = Scheduler(jitter=0.05)
self.scheduler.add_task(1.0, self.flush_buffers)
self.scheduler.add_task(180.0, self.validate_random_run)

def validate_random_run(self):
run_list = list(self.run_cache.values())
if len(run_list) == 0:
print(
"Validate_random_run: cache empty. No runs to validate...", flush=True
)
return
run = random.choice(list(run_list))["run"]
run_id = str(run["_id"])
try:
# Make sure that the run object does not change while we are
# validating it
with self.active_run_lock(run_id):
# We verify only the aggregated data since the other
# data is not synchronized and may be in a transient
# inconsistent state
validate(valid_aggregated_data, run, "run")
print(
f"Validate_random_run: validated aggregated data in cache run {run_id}...",
flush=True,
)
except ValidationError as e:
message = f"The run object {run_id} does not validate: {str(e)}"
print(message, flush=True)
if "version" in run and run["version"] >= RUN_VERSION:
self.actiondb.log_message(
username="fishtest.system",
message=message,
)

def set_inactive_run(self, run):
run_id = run["_id"]
Expand Down Expand Up @@ -348,7 +390,8 @@ def get_nns(self, user="", network_name="", master_only=False, limit=0, skip=0):

# handle termination
def exit_run(self, signum, frame):
self.stop_timer()
if self.scheduler is not None:
self.scheduler.stop()
self.flush_all()
if self.port >= 0:
self.actiondb.system_event(message=f"stop fishtest@{self.port}")
Expand Down Expand Up @@ -378,17 +421,6 @@ def get_run(self, r_id):
else:
return self.runs.find_one({"_id": r_id_obj})

def stop_timer(self):
if self.timer is not None:
self.timer.cancel()
self.timer = None
self.timer_active = False

def start_timer(self):
if self.timer_active:
self.timer = threading.Timer(1.0, self.flush_buffers)
self.timer.start()

def buffer(self, run, flush):
if not self.is_primary_instance():
print(
Expand Down Expand Up @@ -438,8 +470,6 @@ def flush_all(self):
print("done", flush=True)

def flush_buffers(self):
if self.timer is None:
return
try:
self.run_cache_lock.acquire()
now = time.time()
Expand Down Expand Up @@ -476,9 +506,7 @@ def flush_buffers(self):
except:
print("Flush exception", flush=True)
finally:
# Restart timer:
self.run_cache_lock.release()
self.start_timer()

def scavenge(self, run):
if datetime.now(timezone.utc) < boot_time + timedelta(seconds=300):
Expand Down
40 changes: 37 additions & 3 deletions server/fishtest/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,43 @@ def final_results_must_match(run):
raise Exception(
f"The final results {run['results']} do not match the computed results {rr}"
)
else:
return True

return True


def cores_must_match(run):
cores = 0
for t in run["tasks"]:
if t["active"]:
cores += t["worker_info"]["concurrency"]
if cores != run["cores"]:
raise Exception(
f"Cores mismatch. Cores from tasks: {cores}. Cores from "
f"run: {run['cores']}"
)

return True


def workers_must_match(run):
workers = 0
for t in run["tasks"]:
if t["active"]:
workers += 1
if workers != run["workers"]:
raise Exception(
f"Workers mismatch. Workers from tasks: {workers}. Workers from "
f"run: {run['workers']}"
)

return True


valid_aggregated_data = intersect(
final_results_must_match,
cores_must_match,
workers_must_match,
)

# The following schema only matches new runs. The old runs
# are not compatible with it. For documentation purposes
Expand Down Expand Up @@ -655,5 +689,5 @@ def final_results_must_match(run):
lax(ifthen({"deleted": True}, {"finished": True})),
lax(ifthen({"finished": True}, {"workers": 0, "cores": 0})),
lax(ifthen({"finished": True}, {"tasks": [{"active": False}, ...]})),
final_results_must_match,
valid_aggregated_data,
)
Loading

0 comments on commit 5c78bdc

Please sign in to comment.