From 315ce21853d66170f93abe00dd648176e8ec519f Mon Sep 17 00:00:00 2001 From: Michel Van den Bergh Date: Tue, 14 May 2024 16:47:39 +0000 Subject: [PATCH] Fully dynamic updates of run["cores"] and run["workers"]. We have everywhere replaced task["active"] = False by set_inactive(task_id, run) which does the required bookkeeping under the task_lock. The intention is that the task_lock will only be held for a very short time. So in request_task we only grab it at the moment when we actually add a task to the run (currently it is held during the whole invocation of request_task). If it works then this PR can serve as a model for other types of bookkeeping like the number of committed games. --- server/fishtest/rundb.py | 152 ++++++++++++++++++++++----------------- server/fishtest/views.py | 3 +- 2 files changed, 86 insertions(+), 69 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 3d3771b61..f7eacef92 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -76,6 +76,40 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): self.__is_primary_instance = is_primary_instance + self.task_lock = threading.Lock() + self.request_task_lock = threading.Lock() + if self.is_primary_instance(): + self.update_workers_cores() + + def set_inactive_run(self, run): + with self.task_lock: + run["workers"] = run["cores"] = 0 + for task in run["tasks"]: + task["active"] = False + + def set_inactive(self, task_id, run): + task = run["tasks"][task_id] + with self.task_lock: + if task["active"]: + run["workers"] -= 1 + run["cores"] -= task["worker_info"]["concurrency"] + task["active"] = False + + def update_workers_cores(self): + """ + If the code is correct then this should be a noop, unless after a crash. + """ + with self.task_lock: + for r in self.get_unfinished_runs_id(): + workers = cores = 0 + run = self.get_run(r["_id"]) + for task in run["tasks"]: + if task["active"]: + workers += 1 + cores += int(task["worker_info"]["concurrency"]) + run["workers"], run["cores"] = workers, cores + self.buffer(run, True) + def new_run( self, base_tag, @@ -448,7 +482,7 @@ def scavenge(self, run): for task in run["tasks"]: task_id += 1 if task["active"] and task["last_updated"] < old: - task["active"] = False + self.set_inactive(task_id, run) dead_task = True print( "dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format( @@ -508,8 +542,6 @@ def aggregate_unfinished_runs(self, username=None): if any(task["active"] for task in reversed(run["tasks"])) else "pending" ) - if state == "pending": - run["workers"] = run["cores"] = 0 runs[state].append(run) runs["pending"].sort( key=lambda run: ( @@ -669,18 +701,10 @@ def calc_itp(self, run, count): run["args"]["itp"] = itp - def update_workers_cores(self, run): - workers = cores = 0 - for task in run["tasks"]: - if task["active"]: - workers += 1 - cores += int(task["worker_info"]["concurrency"]) - run["workers"], run["cores"] = workers, cores - # Limit concurrent request_task - # The semaphore must be initialized with a value - # less than the number of Waitress threads. - task_lock = threading.Lock() + # It is very important that the following semaphore is initialized + # with a value strictly less than the number of Waitress threads. + task_semaphore = threading.Semaphore(2) task_time = 0 @@ -724,7 +748,7 @@ def blocked_worker_message(self, worker_name, message, host_url): def request_task(self, worker_info): if self.task_semaphore.acquire(False): try: - with self.task_lock: + with self.request_task_lock: return self.sync_request_task(worker_info) finally: self.task_semaphore.release() @@ -777,7 +801,6 @@ def sync_request_task(self, worker_info): self.task_runs = [] for r in self.get_unfinished_runs_id(): run = self.get_run(r["_id"]) - self.update_workers_cores(run) self.calc_itp(run, user_active.count(run["args"].get("username"))) self.task_runs.append(run) self.task_time = time.time() @@ -827,14 +850,17 @@ def priority(run): # lower is better # get the list of active tasks active_tasks = [ - task for run in self.task_runs for task in run["tasks"] if task["active"] + (run, task_id, task) + for run in self.task_runs + for task_id, task in enumerate(run["tasks"]) + if task["active"] ] # We go through the list of active tasks to see if a worker with the same # name is already connected. now = datetime.now(timezone.utc) - for task in active_tasks: + for run, task_id, task in active_tasks: task_name = worker_name(task["worker_info"], short=True) if my_name == task_name: task_name_long = worker_name(task["worker_info"]) @@ -847,7 +873,7 @@ def priority(run): # lower is better f'Stale active task detected for worker "{my_name_long}". Correcting...', flush=True, ) - task["active"] = False + self.set_inactive(task_id, run) continue last_update = (now - task["last_updated"]).seconds # 120 = period of heartbeat in worker. @@ -865,7 +891,7 @@ def priority(run): # lower is better connections = 0 connections_limit = self.userdb.get_machine_limit(worker_info["username"]) - for task in active_tasks: + for run, task_id, task in active_tasks: if task["worker_info"]["remote_addr"] == worker_info["remote_addr"]: connections += 1 if connections >= connections_limit: @@ -983,38 +1009,40 @@ def priority(run): # lower is better return {"task_waiting": False} # Now we create a new task for this run. - opening_offset = 0 - for task in run["tasks"]: - opening_offset += task["num_games"] + with self.task_lock: + opening_offset = 0 + for task in run["tasks"]: + opening_offset += task["num_games"] - if "sprt" in run["args"]: - sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"] - remaining = sprt_batch_size_games * math.ceil( - remaining / sprt_batch_size_games - ) + if "sprt" in run["args"]: + sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"] + remaining = sprt_batch_size_games * math.ceil( + remaining / sprt_batch_size_games + ) - task_size = min(self.worker_cap(run, worker_info), remaining) - task = { - "num_games": task_size, - "active": True, - "worker_info": worker_info, - "last_updated": datetime.now(timezone.utc), - "start": opening_offset, - "stats": { - "wins": 0, - "losses": 0, - "draws": 0, - "crashes": 0, - "time_losses": 0, - "pentanomial": 5 * [0], - }, - } - run["tasks"].append(task) + task_size = min(self.worker_cap(run, worker_info), remaining) + task = { + "num_games": task_size, + "active": True, + "worker_info": worker_info, + "last_updated": datetime.now(timezone.utc), + "start": opening_offset, + "stats": { + "wins": 0, + "losses": 0, + "draws": 0, + "crashes": 0, + "time_losses": 0, + "pentanomial": 5 * [0], + }, + } + run["tasks"].append(task) + + task_id = len(run["tasks"]) - 1 - task_id = len(run["tasks"]) - 1 + run["workers"] += 1 + run["cores"] += task["worker_info"]["concurrency"] - run["workers"] += 1 - run["cores"] += task["worker_info"]["concurrency"] self.buffer(run, False) # Cache some data. Currently we record the id's @@ -1167,7 +1195,7 @@ def count_games(d): run_id, task_id ) print(info, flush=True) - task["active"] = False + self.set_inactive(task_id, run) self.buffer(run, True) return {"task_alive": False, "info": info} @@ -1198,7 +1226,7 @@ def count_games(d): if error != "": print(error, flush=True) - task["active"] = False + self.set_inactive(task_id, run) return {"task_alive": False, "error": error} # The update seems fine. @@ -1226,26 +1254,18 @@ def count_games(d): if num_games >= task["num_games"]: # This task is now finished task_finished = True - task["active"] = False + self.set_inactive(task_id, run) # Now update the current run. run["last_updated"] = update_time - if task_finished: - # run["cores"] is also updated in request_task(). - # We use the same lock. - with self.task_lock: - run["workers"] -= 1 - run["cores"] -= task["worker_info"]["concurrency"] - assert run["cores"] >= 0 - if "sprt" in run["args"]: sprt = run["args"]["sprt"] fishtest.stats.stat_util.update_SPRT(run["results"], sprt) if sprt["state"] != "": task_finished = True - task["active"] = False + self.set_inactive(task_id, run) if "spsa" in run["args"] and spsa_games == spsa["num_games"]: self.update_spsa(task["worker_info"]["unique_key"], run, spsa) @@ -1292,7 +1312,7 @@ def failed_task(self, run_id, task_id, message="Unknown reason"): print(info, flush=True) return {"task_alive": False, "info": info} # Mark the task as inactive. - task["active"] = False + self.set_inactive(task_id, run) self.handle_crash_or_time(run, task_id) self.buffer(run, False) print( @@ -1319,8 +1339,8 @@ def stop_run(self, run_id): """ self.clear_params(run_id) # spsa stuff run = self.get_run(run_id) - for task in run["tasks"]: - task["active"] = False + self.set_inactive_run(run) + results = run["results"] run["results_info"] = format_results(results, run) # De-couple the styling of the run from its finished status @@ -1328,8 +1348,6 @@ def stop_run(self, run_id): run["is_green"] = True elif run["results_info"]["style"] == "yellow": run["is_yellow"] = True - run["cores"] = 0 - run["workers"] = 0 run["finished"] = True try: validate(runs_schema, run, "run") @@ -1430,7 +1448,7 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1): # For safety we also set the stats # to zero. task["bad"] = True - task["active"] = False + self.set_inactive(task_id, run) task["stats"] = copy.deepcopy(zero_stats) chi2 = get_chi2(run["tasks"]) @@ -1456,7 +1474,7 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1): bad_task["bad"] = True run["bad_tasks"].append(bad_task) task["bad"] = True - task["active"] = False + self.set_inactive(task_id, run) task["stats"] = copy.deepcopy(zero_stats) if message == "": diff --git a/server/fishtest/views.py b/server/fishtest/views.py index 279ea6c3b..6f54af9cc 100644 --- a/server/fishtest/views.py +++ b/server/fishtest/views.py @@ -1373,8 +1373,7 @@ def tests_delete(request): run["deleted"] = True run["finished"] = True - for task in run["tasks"]: - task["active"] = False + request.rundb.set_inactive_run(run) request.rundb.buffer(run, True) request.rundb.task_time = 0