Skip to content

Commit

Permalink
Fully dynamic updates of run["cores"] and run["workers"].
Browse files Browse the repository at this point in the history
We have everywhere replaced

task["active"] = False

by

set_inactive(task_id, run)

which does the required bookkeeping under the task_lock.

The intention is that the task_lock will only be held for a very
short time. So in request_task we only grab it at the moment
when we actually add a task to the run (currently it is held
during the whole invocation of request_task).

If it works then this PR can serve as a model for other types of
bookkeeping like the number of committed games.
  • Loading branch information
vdbergh committed May 17, 2024
1 parent 8a2a93c commit 0930646
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 73 deletions.
4 changes: 4 additions & 0 deletions server/fishtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ def file_hash(file):
is_primary_instance = port == primary_port

rundb = RunDb(port=port, is_primary_instance=is_primary_instance)
# We do not want to do this in the constructor of rundb since
# it starts the flush timer.
if rundb.is_primary_instance():
rundb.update_workers_cores()

def add_rundb(event):
event.request.rundb = rundb
Expand Down
165 changes: 94 additions & 71 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,42 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):

self.__is_primary_instance = is_primary_instance

self.task_lock = threading.Lock()
self.request_task_lock = threading.Lock()
self.timer = None
self.timer_active = True
if self.is_primary_instance():
self.update_workers_cores()

def set_inactive_run(self, run):
with self.task_lock:
run["workers"] = run["cores"] = 0
for task in run["tasks"]:
task["active"] = False

def set_inactive(self, task_id, run):
task = run["tasks"][task_id]
with self.task_lock:
if task["active"]:
run["workers"] -= 1
run["cores"] -= task["worker_info"]["concurrency"]
task["active"] = False

def update_workers_cores(self):
"""
If the code is correct then this should be a noop, unless after a crash.
"""
with self.task_lock:
for r in self.get_unfinished_runs_id():
workers = cores = 0
run = self.get_run(r["_id"])
for task in run["tasks"]:
if task["active"]:
workers += 1
cores += int(task["worker_info"]["concurrency"])
run["workers"], run["cores"] = workers, cores
self.buffer(run, False)

def new_run(
self,
base_tag,
Expand Down Expand Up @@ -308,12 +344,11 @@ def get_nns(self, user="", network_name="", master_only=False, limit=0, skip=0):
run_cache_lock = threading.Lock()
run_cache_write_lock = threading.Lock()

timer = None

# handle termination
def exit_run(signum, frame):
global last_rundb
if last_rundb:
last_rundb.stop_timer()
last_rundb.flush_all()
if last_rundb.port >= 0:
last_rundb.actiondb.system_event(
Expand Down Expand Up @@ -348,9 +383,13 @@ def get_run(self, r_id):
else:
return self.runs.find_one({"_id": r_id_obj})

def stop_timer(self):
self.timer_active = False

def start_timer(self):
self.timer = threading.Timer(1.0, self.flush_buffers)
self.timer.start()
if self.timer_active:
self.timer = threading.Timer(1.0, self.flush_buffers)
self.timer.start()

def buffer(self, run, flush):
with self.run_cache_lock:
Expand Down Expand Up @@ -448,7 +487,7 @@ def scavenge(self, run):
for task in run["tasks"]:
task_id += 1
if task["active"] and task["last_updated"] < old:
task["active"] = False
self.set_inactive(task_id, run)
dead_task = True
print(
"dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format(
Expand Down Expand Up @@ -508,8 +547,6 @@ def aggregate_unfinished_runs(self, username=None):
if any(task["active"] for task in reversed(run["tasks"]))
else "pending"
)
if state == "pending":
run["workers"] = run["cores"] = 0
runs[state].append(run)
runs["pending"].sort(
key=lambda run: (
Expand Down Expand Up @@ -669,18 +706,10 @@ def calc_itp(self, run, count):

run["args"]["itp"] = itp

def update_workers_cores(self, run):
workers = cores = 0
for task in run["tasks"]:
if task["active"]:
workers += 1
cores += int(task["worker_info"]["concurrency"])
run["workers"], run["cores"] = workers, cores

# Limit concurrent request_task
# The semaphore must be initialized with a value
# less than the number of Waitress threads.
task_lock = threading.Lock()
# It is very important that the following semaphore is initialized
# with a value strictly less than the number of Waitress threads.

task_semaphore = threading.Semaphore(2)

task_time = 0
Expand Down Expand Up @@ -724,7 +753,7 @@ def blocked_worker_message(self, worker_name, message, host_url):
def request_task(self, worker_info):
if self.task_semaphore.acquire(False):
try:
with self.task_lock:
with self.request_task_lock:
return self.sync_request_task(worker_info)
finally:
self.task_semaphore.release()
Expand Down Expand Up @@ -777,7 +806,6 @@ def sync_request_task(self, worker_info):
self.task_runs = []
for r in self.get_unfinished_runs_id():
run = self.get_run(r["_id"])
self.update_workers_cores(run)
self.calc_itp(run, user_active.count(run["args"].get("username")))
self.task_runs.append(run)
self.task_time = time.time()
Expand Down Expand Up @@ -827,14 +855,17 @@ def priority(run): # lower is better
# get the list of active tasks

active_tasks = [
task for run in self.task_runs for task in run["tasks"] if task["active"]
(run, task_id, task)
for run in self.task_runs
for task_id, task in enumerate(run["tasks"])
if task["active"]
]

# We go through the list of active tasks to see if a worker with the same
# name is already connected.

now = datetime.now(timezone.utc)
for task in active_tasks:
for run, task_id, task in active_tasks:
task_name = worker_name(task["worker_info"], short=True)
if my_name == task_name:
task_name_long = worker_name(task["worker_info"])
Expand All @@ -847,7 +878,7 @@ def priority(run): # lower is better
f'Stale active task detected for worker "{my_name_long}". Correcting...',
flush=True,
)
task["active"] = False
self.set_inactive(task_id, run)
continue
last_update = (now - task["last_updated"]).seconds
# 120 = period of heartbeat in worker.
Expand All @@ -865,7 +896,7 @@ def priority(run): # lower is better

connections = 0
connections_limit = self.userdb.get_machine_limit(worker_info["username"])
for task in active_tasks:
for run, task_id, task in active_tasks:
if task["worker_info"]["remote_addr"] == worker_info["remote_addr"]:
connections += 1
if connections >= connections_limit:
Expand Down Expand Up @@ -983,38 +1014,40 @@ def priority(run): # lower is better
return {"task_waiting": False}

# Now we create a new task for this run.
opening_offset = 0
for task in run["tasks"]:
opening_offset += task["num_games"]
with self.task_lock:
opening_offset = 0
for task in run["tasks"]:
opening_offset += task["num_games"]

if "sprt" in run["args"]:
sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"]
remaining = sprt_batch_size_games * math.ceil(
remaining / sprt_batch_size_games
)
if "sprt" in run["args"]:
sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"]
remaining = sprt_batch_size_games * math.ceil(
remaining / sprt_batch_size_games
)

task_size = min(self.worker_cap(run, worker_info), remaining)
task = {
"num_games": task_size,
"active": True,
"worker_info": worker_info,
"last_updated": datetime.now(timezone.utc),
"start": opening_offset,
"stats": {
"wins": 0,
"losses": 0,
"draws": 0,
"crashes": 0,
"time_losses": 0,
"pentanomial": 5 * [0],
},
}
run["tasks"].append(task)
task_size = min(self.worker_cap(run, worker_info), remaining)
task = {
"num_games": task_size,
"active": True,
"worker_info": worker_info,
"last_updated": datetime.now(timezone.utc),
"start": opening_offset,
"stats": {
"wins": 0,
"losses": 0,
"draws": 0,
"crashes": 0,
"time_losses": 0,
"pentanomial": 5 * [0],
},
}
run["tasks"].append(task)

task_id = len(run["tasks"]) - 1

task_id = len(run["tasks"]) - 1
run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]

run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]
self.buffer(run, False)

# Cache some data. Currently we record the id's
Expand Down Expand Up @@ -1167,7 +1200,7 @@ def count_games(d):
run_id, task_id
)
print(info, flush=True)
task["active"] = False
self.set_inactive(task_id, run)
self.buffer(run, True)
return {"task_alive": False, "info": info}

Expand Down Expand Up @@ -1198,7 +1231,7 @@ def count_games(d):

if error != "":
print(error, flush=True)
task["active"] = False
self.set_inactive(task_id, run)
return {"task_alive": False, "error": error}

# The update seems fine.
Expand Down Expand Up @@ -1226,26 +1259,18 @@ def count_games(d):
if num_games >= task["num_games"]:
# This task is now finished
task_finished = True
task["active"] = False
self.set_inactive(task_id, run)

# Now update the current run.

run["last_updated"] = update_time

if task_finished:
# run["cores"] is also updated in request_task().
# We use the same lock.
with self.task_lock:
run["workers"] -= 1
run["cores"] -= task["worker_info"]["concurrency"]
assert run["cores"] >= 0

if "sprt" in run["args"]:
sprt = run["args"]["sprt"]
fishtest.stats.stat_util.update_SPRT(run["results"], sprt)
if sprt["state"] != "":
task_finished = True
task["active"] = False
self.set_inactive(task_id, run)

if "spsa" in run["args"] and spsa_games == spsa["num_games"]:
self.update_spsa(task["worker_info"]["unique_key"], run, spsa)
Expand Down Expand Up @@ -1292,7 +1317,7 @@ def failed_task(self, run_id, task_id, message="Unknown reason"):
print(info, flush=True)
return {"task_alive": False, "info": info}
# Mark the task as inactive.
task["active"] = False
self.set_inactive(task_id, run)
self.handle_crash_or_time(run, task_id)
self.buffer(run, False)
print(
Expand All @@ -1319,17 +1344,15 @@ def stop_run(self, run_id):
"""
self.clear_params(run_id) # spsa stuff
run = self.get_run(run_id)
for task in run["tasks"]:
task["active"] = False
self.set_inactive_run(run)

results = run["results"]
run["results_info"] = format_results(results, run)
# De-couple the styling of the run from its finished status
if run["results_info"]["style"] == "#44EB44":
run["is_green"] = True
elif run["results_info"]["style"] == "yellow":
run["is_yellow"] = True
run["cores"] = 0
run["workers"] = 0
run["finished"] = True
try:
validate(runs_schema, run, "run")
Expand Down Expand Up @@ -1430,7 +1453,7 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1):
# For safety we also set the stats
# to zero.
task["bad"] = True
task["active"] = False
self.set_inactive(task_id, run)
task["stats"] = copy.deepcopy(zero_stats)

chi2 = get_chi2(run["tasks"])
Expand All @@ -1456,7 +1479,7 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1):
bad_task["bad"] = True
run["bad_tasks"].append(bad_task)
task["bad"] = True
task["active"] = False
self.set_inactive(task_id, run)
task["stats"] = copy.deepcopy(zero_stats)

if message == "":
Expand Down
3 changes: 1 addition & 2 deletions server/fishtest/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,8 +1373,7 @@ def tests_delete(request):

run["deleted"] = True
run["finished"] = True
for task in run["tasks"]:
task["active"] = False
request.rundb.set_inactive_run(run)
request.rundb.buffer(run, True)
request.rundb.task_time = 0

Expand Down

0 comments on commit 0930646

Please sign in to comment.