Skip to content

Commit

Permalink
Fully dynamic updates of run["cores"] and run["workers"].
Browse files Browse the repository at this point in the history
We have everywhere replaced

task["active"] = False

by

set_inactive(task_id, run)

which does the required bookkeeping under the task_lock.

The intention is that the task_lock will only be held for a very
short time. So in request_task we only grab it at the moment
when we actually add a task to the run (currently it is held
during the whole invocation of request_task).

If it works then this PR can serve as a model for other types of
bookkeeping like the number of committed games.
  • Loading branch information
vdbergh committed May 16, 2024
1 parent 8a2a93c commit 315ce21
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 69 deletions.
152 changes: 85 additions & 67 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,40 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):

self.__is_primary_instance = is_primary_instance

self.task_lock = threading.Lock()
self.request_task_lock = threading.Lock()
if self.is_primary_instance():
self.update_workers_cores()

def set_inactive_run(self, run):
with self.task_lock:
run["workers"] = run["cores"] = 0
for task in run["tasks"]:
task["active"] = False

def set_inactive(self, task_id, run):
task = run["tasks"][task_id]
with self.task_lock:
if task["active"]:
run["workers"] -= 1
run["cores"] -= task["worker_info"]["concurrency"]
task["active"] = False

def update_workers_cores(self):
"""
If the code is correct then this should be a noop, unless after a crash.
"""
with self.task_lock:
for r in self.get_unfinished_runs_id():
workers = cores = 0
run = self.get_run(r["_id"])
for task in run["tasks"]:
if task["active"]:
workers += 1
cores += int(task["worker_info"]["concurrency"])
run["workers"], run["cores"] = workers, cores
self.buffer(run, True)

def new_run(
self,
base_tag,
Expand Down Expand Up @@ -448,7 +482,7 @@ def scavenge(self, run):
for task in run["tasks"]:
task_id += 1
if task["active"] and task["last_updated"] < old:
task["active"] = False
self.set_inactive(task_id, run)
dead_task = True
print(
"dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format(
Expand Down Expand Up @@ -508,8 +542,6 @@ def aggregate_unfinished_runs(self, username=None):
if any(task["active"] for task in reversed(run["tasks"]))
else "pending"
)
if state == "pending":
run["workers"] = run["cores"] = 0
runs[state].append(run)
runs["pending"].sort(
key=lambda run: (
Expand Down Expand Up @@ -669,18 +701,10 @@ def calc_itp(self, run, count):

run["args"]["itp"] = itp

def update_workers_cores(self, run):
workers = cores = 0
for task in run["tasks"]:
if task["active"]:
workers += 1
cores += int(task["worker_info"]["concurrency"])
run["workers"], run["cores"] = workers, cores

# Limit concurrent request_task
# The semaphore must be initialized with a value
# less than the number of Waitress threads.
task_lock = threading.Lock()
# It is very important that the following semaphore is initialized
# with a value strictly less than the number of Waitress threads.

task_semaphore = threading.Semaphore(2)

task_time = 0
Expand Down Expand Up @@ -724,7 +748,7 @@ def blocked_worker_message(self, worker_name, message, host_url):
def request_task(self, worker_info):
if self.task_semaphore.acquire(False):
try:
with self.task_lock:
with self.request_task_lock:
return self.sync_request_task(worker_info)
finally:
self.task_semaphore.release()
Expand Down Expand Up @@ -777,7 +801,6 @@ def sync_request_task(self, worker_info):
self.task_runs = []
for r in self.get_unfinished_runs_id():
run = self.get_run(r["_id"])
self.update_workers_cores(run)
self.calc_itp(run, user_active.count(run["args"].get("username")))
self.task_runs.append(run)
self.task_time = time.time()
Expand Down Expand Up @@ -827,14 +850,17 @@ def priority(run): # lower is better
# get the list of active tasks

active_tasks = [
task for run in self.task_runs for task in run["tasks"] if task["active"]
(run, task_id, task)
for run in self.task_runs
for task_id, task in enumerate(run["tasks"])
if task["active"]
]

# We go through the list of active tasks to see if a worker with the same
# name is already connected.

now = datetime.now(timezone.utc)
for task in active_tasks:
for run, task_id, task in active_tasks:
task_name = worker_name(task["worker_info"], short=True)
if my_name == task_name:
task_name_long = worker_name(task["worker_info"])
Expand All @@ -847,7 +873,7 @@ def priority(run): # lower is better
f'Stale active task detected for worker "{my_name_long}". Correcting...',
flush=True,
)
task["active"] = False
self.set_inactive(task_id, run)
continue
last_update = (now - task["last_updated"]).seconds
# 120 = period of heartbeat in worker.
Expand All @@ -865,7 +891,7 @@ def priority(run): # lower is better

connections = 0
connections_limit = self.userdb.get_machine_limit(worker_info["username"])
for task in active_tasks:
for run, task_id, task in active_tasks:
if task["worker_info"]["remote_addr"] == worker_info["remote_addr"]:
connections += 1
if connections >= connections_limit:
Expand Down Expand Up @@ -983,38 +1009,40 @@ def priority(run): # lower is better
return {"task_waiting": False}

# Now we create a new task for this run.
opening_offset = 0
for task in run["tasks"]:
opening_offset += task["num_games"]
with self.task_lock:
opening_offset = 0
for task in run["tasks"]:
opening_offset += task["num_games"]

if "sprt" in run["args"]:
sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"]
remaining = sprt_batch_size_games * math.ceil(
remaining / sprt_batch_size_games
)
if "sprt" in run["args"]:
sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"]
remaining = sprt_batch_size_games * math.ceil(
remaining / sprt_batch_size_games
)

task_size = min(self.worker_cap(run, worker_info), remaining)
task = {
"num_games": task_size,
"active": True,
"worker_info": worker_info,
"last_updated": datetime.now(timezone.utc),
"start": opening_offset,
"stats": {
"wins": 0,
"losses": 0,
"draws": 0,
"crashes": 0,
"time_losses": 0,
"pentanomial": 5 * [0],
},
}
run["tasks"].append(task)
task_size = min(self.worker_cap(run, worker_info), remaining)
task = {
"num_games": task_size,
"active": True,
"worker_info": worker_info,
"last_updated": datetime.now(timezone.utc),
"start": opening_offset,
"stats": {
"wins": 0,
"losses": 0,
"draws": 0,
"crashes": 0,
"time_losses": 0,
"pentanomial": 5 * [0],
},
}
run["tasks"].append(task)

task_id = len(run["tasks"]) - 1

task_id = len(run["tasks"]) - 1
run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]

run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]
self.buffer(run, False)

# Cache some data. Currently we record the id's
Expand Down Expand Up @@ -1167,7 +1195,7 @@ def count_games(d):
run_id, task_id
)
print(info, flush=True)
task["active"] = False
self.set_inactive(task_id, run)
self.buffer(run, True)
return {"task_alive": False, "info": info}

Expand Down Expand Up @@ -1198,7 +1226,7 @@ def count_games(d):

if error != "":
print(error, flush=True)
task["active"] = False
self.set_inactive(task_id, run)
return {"task_alive": False, "error": error}

# The update seems fine.
Expand Down Expand Up @@ -1226,26 +1254,18 @@ def count_games(d):
if num_games >= task["num_games"]:
# This task is now finished
task_finished = True
task["active"] = False
self.set_inactive(task_id, run)

# Now update the current run.

run["last_updated"] = update_time

if task_finished:
# run["cores"] is also updated in request_task().
# We use the same lock.
with self.task_lock:
run["workers"] -= 1
run["cores"] -= task["worker_info"]["concurrency"]
assert run["cores"] >= 0

if "sprt" in run["args"]:
sprt = run["args"]["sprt"]
fishtest.stats.stat_util.update_SPRT(run["results"], sprt)
if sprt["state"] != "":
task_finished = True
task["active"] = False
self.set_inactive(task_id, run)

if "spsa" in run["args"] and spsa_games == spsa["num_games"]:
self.update_spsa(task["worker_info"]["unique_key"], run, spsa)
Expand Down Expand Up @@ -1292,7 +1312,7 @@ def failed_task(self, run_id, task_id, message="Unknown reason"):
print(info, flush=True)
return {"task_alive": False, "info": info}
# Mark the task as inactive.
task["active"] = False
self.set_inactive(task_id, run)
self.handle_crash_or_time(run, task_id)
self.buffer(run, False)
print(
Expand All @@ -1319,17 +1339,15 @@ def stop_run(self, run_id):
"""
self.clear_params(run_id) # spsa stuff
run = self.get_run(run_id)
for task in run["tasks"]:
task["active"] = False
self.set_inactive_run(run)

results = run["results"]
run["results_info"] = format_results(results, run)
# De-couple the styling of the run from its finished status
if run["results_info"]["style"] == "#44EB44":
run["is_green"] = True
elif run["results_info"]["style"] == "yellow":
run["is_yellow"] = True
run["cores"] = 0
run["workers"] = 0
run["finished"] = True
try:
validate(runs_schema, run, "run")
Expand Down Expand Up @@ -1430,7 +1448,7 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1):
# For safety we also set the stats
# to zero.
task["bad"] = True
task["active"] = False
self.set_inactive(task_id, run)
task["stats"] = copy.deepcopy(zero_stats)

chi2 = get_chi2(run["tasks"])
Expand All @@ -1456,7 +1474,7 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1):
bad_task["bad"] = True
run["bad_tasks"].append(bad_task)
task["bad"] = True
task["active"] = False
self.set_inactive(task_id, run)
task["stats"] = copy.deepcopy(zero_stats)

if message == "":
Expand Down
3 changes: 1 addition & 2 deletions server/fishtest/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,8 +1373,7 @@ def tests_delete(request):

run["deleted"] = True
run["finished"] = True
for task in run["tasks"]:
task["active"] = False
request.rundb.set_inactive_run(run)
request.rundb.buffer(run, True)
request.rundb.task_time = 0

Expand Down

0 comments on commit 315ce21

Please sign in to comment.