Skip to content

Commit

Permalink
SchedulerMonitor: Apply flow check in record
Browse files Browse the repository at this point in the history
  • Loading branch information
Deepskyhunter committed Jul 29, 2022
1 parent 0efa3db commit 99a1314
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions artiq/test/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@
from artiq.master.scheduler import Scheduler
from sipyco.sync_struct import process_mod

basic_flow = ["pending", "preparing", "prepare_done", "running",
"run_done", "analyzing", "deleting"]

flush_flow = ["pending", "flushing", "preparing", "prepare_done",
"running", "run_done", "analyzing", "deleting"]

class EmptyExperiment(EnvExperiment):
def build(self):
Expand Down Expand Up @@ -66,27 +61,32 @@ def get(self):
return rid

class SchedulerMonitor:
def __init__(self):
def __init__(self, test):
self.test = test
self.experiments = {}
self.last_status = {}
self.exp_flow = {}
self.flags = {"arrive": {}, "leave": {}}
self.flow_map = { # current status -> possible move
"": {"pending"},
"pending": {"preparing", "flushing", "deleting"},
"preparing": {"prepare_done", "deleting"},
"prepare_done": {"running", "deleting"},
"running": {"run_done", "paused", "deleting"},
"run_done": {"analyzing", "deleting"},
"analyzing": {"deleting"},
"deleting": {},
"paused" : {"running"},
"flushing" : {"preparing"}
}

def record(self, mod):
process_mod(self.experiments, mod)
for key, value in self.experiments.items():
if key not in self.last_status.keys():
self.last_status[key] = ""
self.exp_flow[key] = []
current_status = self.experiments[key]["status"]
if current_status != self.last_status[key]:
if self.exp_flow[key]:
self.exp_flow[key][-1]["out_time"] = time()
self.exp_flow[key].append({
"status": current_status,
"in_time": time(),
"out_time": "never"
})
self.test.assertIn(current_status, self.flow_map[self.last_status[key]])

if key in self.flags["arrive"].keys():
if current_status in self.flags["arrive"][key].keys():
Expand All @@ -98,9 +98,6 @@ def record(self, mod):
self.last_status[key] = current_status
return

def get_status_order(self, rid):
return [step["status"] for step in self.exp_flow[rid]]

async def wait_until(self, rid, condition, status):
# condition : "arrive", "leave"
if self.last_status[rid] == status and condition == "arrive":
Expand All @@ -123,6 +120,7 @@ def remove_flag(self, rid, condition, status):
if not self.flags[condition][rid]:
del self.flags[condition][rid]


class SchedulerCase(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
Expand All @@ -132,7 +130,7 @@ def test_steps(self):
loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None, None)
expid = _get_expid("EmptyExperiment")
monitor = SchedulerMonitor()
monitor = SchedulerMonitor(self)
scheduler.notifier.publish = monitor.record
scheduler.start()

Expand All @@ -145,9 +143,7 @@ def test_steps(self):
scheduler.submit("main", expid, 0, None, False)

loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting"))
self.assertEqual(monitor.get_status_order(1), basic_flow)
self.assertEqual(monitor.last_status[0], "pending")
scheduler.notifier.publish = None
loop.run_until_complete(scheduler.stop())

def test_pending_priority(self):
Expand All @@ -170,7 +166,7 @@ def test_pending_priority(self):
late = time() + 100000
early = time() + 1

monitor = SchedulerMonitor()
monitor = SchedulerMonitor(self)
scheduler.notifier.publish = monitor.record
scheduler.start()

Expand All @@ -190,8 +186,6 @@ def test_pending_priority(self):
task.cancel()

loop.run_until_complete(monitor.wait_until(2, "arrive", "deleting"))
self.assertEqual(monitor.get_status_order(2), basic_flow)
scheduler.notifier.publish = None
loop.run_until_complete(scheduler.stop())

def test_pause(self):
Expand All @@ -213,7 +207,7 @@ def check_termination(mod):
expid_bg = _get_expid("BackgroundExperiment")
expid = _get_expid("EmptyExperiment")

monitor = SchedulerMonitor()
monitor = SchedulerMonitor(self)
scheduler.notifier.publish = monitor.record
scheduler.start()
# check_pause is True when rid with higher priority is prepare_done
Expand All @@ -226,7 +220,6 @@ def check_termination(mod):
self.assertTrue(scheduler.check_pause(0))
loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting"))
self.assertFalse(scheduler.check_pause(0))
self.assertEqual(monitor.get_status_order(1), basic_flow)

# check_pause is True when request_termination is called
self.assertFalse(termination_ok)
Expand All @@ -247,7 +240,7 @@ def test_close_with_active_runs(self):
expid_bg = _get_expid("BackgroundExperiment")
# Suppress the SystemExit backtrace when worker process is killed.
expid_bg["log_level"] = logging.CRITICAL
monitor = SchedulerMonitor()
monitor = SchedulerMonitor(self)
expid = _get_expid("EmptyExperiment")

scheduler.notifier.publish = monitor.record
Expand All @@ -270,7 +263,7 @@ def test_flush(self):
expid = _get_expid("EmptyExperiment")
expid_bg = _get_expid("CheckPauseBackgroundExperiment")
expid_bg["log_level"] = logging.CRITICAL
monitor = SchedulerMonitor()
monitor = SchedulerMonitor(self)

scheduler.notifier.publish = monitor.record
scheduler.start()
Expand All @@ -280,8 +273,6 @@ def test_flush(self):
loop.run_until_complete(monitor.wait_until(1, "arrive", "preparing"))
self.assertEqual(monitor.last_status[0], "deleting")
loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting"))
self.assertEqual(monitor.get_status_order(0), basic_flow)
self.assertEqual(monitor.get_status_order(1), flush_flow)

# Flush with higher priority
scheduler.submit("main", expid_bg, 0, None, False)
Expand All @@ -290,7 +281,6 @@ def test_flush(self):
scheduler.submit("main", expid, 1, None, True)
loop.run_until_complete(monitor.wait_until(3, "arrive", "deleting"))
self.assertEqual(monitor.last_status[2], "running")
self.assertEqual(monitor.get_status_order(3), flush_flow)

loop.run_until_complete(scheduler.stop())

Expand Down

0 comments on commit 99a1314

Please sign in to comment.