From 9191d74fbe90104b0582644e407f1cd55d216531 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Tue, 21 Jun 2022 16:59:49 +0800 Subject: [PATCH 01/35] Add alternative to test case Mark cases that has alternative at alternative_case. Include the alternative case in the corresponding position in expect[] with list. Consider alternative case during assertEqual --- artiq/test/test_scheduler.py | 50 +++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 854f17a939..e07159d243 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -199,18 +199,30 @@ def test_pending_priority(self): "value": "prepare_done", "key": "status" }, - { + [{ "path": [0], "action": "setitem", "value": "running", "key": "status" - }, - { + }, + { "path": [2], "action": "setitem", "value": "preparing", "key": "status" - }, + }], + [{ + "path": [0], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "preparing", + "key": "status" + }], { "path": [2], "action": "setitem", @@ -235,18 +247,30 @@ def test_pending_priority(self): "value": "run_done", "key": "status" }, - { + [{ "path": [0], "action": "setitem", "value": "running", "key": "status" - }, - { + }, + { "path": [2], "action": "setitem", "value": "analyzing", "key": "status" - }, + }], + [{ + "path": [0], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "analyzing", + "key": "status" + }], { "path": [2], "action": "setitem", @@ -259,11 +283,19 @@ def test_pending_priority(self): "key": 2 }, ] + alternative_case = [5, 6, 11, 12] done = asyncio.Event() expect_idx = 0 + def notify(mod): nonlocal expect_idx - self.assertEqual(mod, expect[expect_idx]) + if expect_idx in alternative_case: + if mod == expect[expect_idx][0]: + self.assertEqual(mod, expect[expect_idx][0]) + else: + self.assertEqual(mod, expect[expect_idx][1]) + else: + self.assertEqual(mod, expect[expect_idx]) expect_idx += 1 if expect_idx >= len(expect): done.set() From 6f89438afcfe789c7b2e1ca40e833997a55d9275 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 22 Jun 2022 10:11:42 +0800 Subject: [PATCH 02/35] Revert "Add alternative to test case" This reverts commit 9191d74fbe90104b0582644e407f1cd55d216531. --- artiq/test/test_scheduler.py | 50 +++++++----------------------------- 1 file changed, 9 insertions(+), 41 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index e07159d243..854f17a939 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -199,30 +199,18 @@ def test_pending_priority(self): "value": "prepare_done", "key": "status" }, - [{ - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "preparing", - "key": "status" - }], - [{ + { "path": [0], "action": "setitem", "value": "running", "key": "status" - }, - { + }, + { "path": [2], "action": "setitem", "value": "preparing", "key": "status" - }], + }, { "path": [2], "action": "setitem", @@ -247,30 +235,18 @@ def test_pending_priority(self): "value": "run_done", "key": "status" }, - [{ - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "analyzing", - "key": "status" - }], - [{ + { "path": [0], "action": "setitem", "value": "running", "key": "status" - }, - { + }, + { "path": [2], "action": "setitem", "value": "analyzing", "key": "status" - }], + }, { "path": [2], "action": "setitem", @@ -283,19 +259,11 @@ def test_pending_priority(self): "key": 2 }, ] - alternative_case = [5, 6, 11, 12] done = asyncio.Event() expect_idx = 0 - def notify(mod): nonlocal expect_idx - if expect_idx in alternative_case: - if mod == expect[expect_idx][0]: - self.assertEqual(mod, expect[expect_idx][0]) - else: - self.assertEqual(mod, expect[expect_idx][1]) - else: - self.assertEqual(mod, expect[expect_idx]) + self.assertEqual(mod, expect[expect_idx]) expect_idx += 1 if expect_idx >= len(expect): done.set() From 0c1da822cc653b47a553d2964333f5c9c20d9d89 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 22 Jun 2022 10:18:21 +0800 Subject: [PATCH 03/35] Modify expect and _get_basic_steps and create last_state_RID Add parameter due_date to _get_basic_steps() for more application Create expectRID for each experiment by using _get_basic_steps() Put expectRIDs into expect Create last_state_RID for checking --- artiq/test/test_scheduler.py | 136 +++++------------------------------ 1 file changed, 19 insertions(+), 117 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 854f17a939..4f9eef08c7 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -50,11 +50,11 @@ def _get_expid(name): } -def _get_basic_steps(rid, expid, priority=0, flush=False): +def _get_basic_steps(rid, expid, priority=0, flush=False, due_date=None): return [ {"action": "setitem", "key": rid, "value": {"pipeline": "main", "status": "pending", "priority": priority, - "expid": expid, "due_date": None, "flush": flush, + "expid": expid, "due_date": due_date, "flush": flush, "repo_msg": None}, "path": []}, {"action": "setitem", "key": "status", "value": "preparing", @@ -144,121 +144,23 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - expect = [ - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": low_priority, - "pipeline": "main", - "due_date": None, - "status": "pending", - "expid": expid_bg, - "flush": False - }, - "key": 0 - }, - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": high_priority, - "pipeline": "main", - "due_date": late, - "status": "pending", - "expid": expid_empty, - "flush": False - }, - "key": 1 - }, - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": middle_priority, - "pipeline": "main", - "due_date": early, - "status": "pending", - "expid": expid_empty, - "flush": False - }, - "key": 2 - }, - { - "path": [0], - "action": "setitem", - "value": "preparing", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "prepare_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "preparing", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "prepare_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "paused", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "run_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "analyzing", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "deleting", - "key": "status" - }, - { - "path": [], - "action": "delitem", - "key": 2 - }, - ] + expectRID0 = _get_basic_steps(0, expid_bg, low_priority) + expectRID1 = _get_basic_steps(1, expid_empty, high_priority, late) + expectRID2 = _get_basic_steps(2, expid_empty, middle_priority, early) + expectRID0.insert(4, + {"action": "setitem", "key": "status", "value": "pasued", + "path": [0]}) + expectRID0.insert(5, + {"action": "setitem", "key": "status", "value": "running", + "path": [0]}) + + expect = [expectRID0, expectRID1, expectRID2] + + last_state_RID0 = {"action": "setitem", "key": "status", + "value": "run_done", "path": [0]} + last_state_RID1 = {"action": "setitem", "key": "status", + "value": "preparing", "path": [1]} + done = asyncio.Event() expect_idx = 0 def notify(mod): From ea53aba1582a0275c5be83153dae75e109a279cf Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 22 Jun 2022 11:06:16 +0800 Subject: [PATCH 04/35] Modify notify to suit the test case and change order of a function First identify the RID Then check the state of the corresponding RID Last check the state of each RID when one of them passed all state Change _get_basic_steps parameters' order to common order --- artiq/test/test_scheduler.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 4f9eef08c7..06c6b17c93 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -50,7 +50,7 @@ def _get_expid(name): } -def _get_basic_steps(rid, expid, priority=0, flush=False, due_date=None): +def _get_basic_steps(rid, expid, priority=0, due_date=None, flush = False): return [ {"action": "setitem", "key": rid, "value": {"pipeline": "main", "status": "pending", "priority": priority, @@ -148,27 +148,33 @@ def test_pending_priority(self): expectRID1 = _get_basic_steps(1, expid_empty, high_priority, late) expectRID2 = _get_basic_steps(2, expid_empty, middle_priority, early) expectRID0.insert(4, - {"action": "setitem", "key": "status", "value": "pasued", + {"action": "setitem", "key": "status", "value": "paused", "path": [0]}) expectRID0.insert(5, {"action": "setitem", "key": "status", "value": "running", "path": [0]}) - + expect = [expectRID0, expectRID1, expectRID2] - + last_state_RID0 = {"action": "setitem", "key": "status", "value": "run_done", "path": [0]} last_state_RID1 = {"action": "setitem", "key": "status", "value": "preparing", "path": [1]} done = asyncio.Event() - expect_idx = 0 + def notify(mod): - nonlocal expect_idx - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): + if type(mod["key"]) is int: + rid = mod["key"] + else: + rid = mod["path"][0] + self.assertEqual(mod, expect[rid].pop(0)) + if len(expect[rid]) == 0: + self.assertEqual(rid, 2) + self.assertEqual(expect[0][0], last_state_RID0) + self.assertEqual(expect[1][0], last_state_RID1) done.set() + scheduler.notifier.publish = notify scheduler.start() From 52c6bb3c75736541f16cd8d90c203c78ecb774dd Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 22 Jun 2022 14:25:03 +0800 Subject: [PATCH 05/35] Re-order the parameters in _get_basic_steps Previous change in parameters' order may result in different input of existing code. Put due_date at the back to avoid it. Update the code that used this function --- artiq/test/test_scheduler.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 06c6b17c93..ae5564ddf1 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -50,7 +50,7 @@ def _get_expid(name): } -def _get_basic_steps(rid, expid, priority=0, due_date=None, flush = False): +def _get_basic_steps(rid, expid, priority=0, flush = False, due_date=None): return [ {"action": "setitem", "key": rid, "value": {"pipeline": "main", "status": "pending", "priority": priority, @@ -145,8 +145,10 @@ def test_pending_priority(self): early = time() + 1 expectRID0 = _get_basic_steps(0, expid_bg, low_priority) - expectRID1 = _get_basic_steps(1, expid_empty, high_priority, late) - expectRID2 = _get_basic_steps(2, expid_empty, middle_priority, early) + expectRID1 = _get_basic_steps(1, expid_empty, high_priority, + due_date=late) + expectRID2 = _get_basic_steps(2, expid_empty, middle_priority, + due_date=early) expectRID0.insert(4, {"action": "setitem", "key": "status", "value": "paused", "path": [0]}) From a9f121773636e9ea5247ec954c0cd0fb4a881397 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 22 Jun 2022 17:22:38 +0800 Subject: [PATCH 06/35] Remove space in args in _get_basic_steps --- artiq/test/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index ae5564ddf1..aa099ed45b 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -50,7 +50,7 @@ def _get_expid(name): } -def _get_basic_steps(rid, expid, priority=0, flush = False, due_date=None): +def _get_basic_steps(rid, expid, priority=0, flush=False, due_date=None): return [ {"action": "setitem", "key": rid, "value": {"pipeline": "main", "status": "pending", "priority": priority, From a09236dee30a54c43aa81e351eaff49efa85d511 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 23 Jun 2022 09:58:13 +0800 Subject: [PATCH 07/35] Rename last_state to expect_next_state and Add comments --- artiq/test/test_scheduler.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index aa099ed45b..7dd35ce418 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -149,23 +149,30 @@ def test_pending_priority(self): due_date=late) expectRID2 = _get_basic_steps(2, expid_empty, middle_priority, due_date=early) + #RID0 paused because RID2 reach due_date and has higher priority expectRID0.insert(4, {"action": "setitem", "key": "status", "value": "paused", "path": [0]}) + #RID0 resume running after RID finish running expectRID0.insert(5, {"action": "setitem", "key": "status", "value": "running", "path": [0]}) expect = [expectRID0, expectRID1, expectRID2] - last_state_RID0 = {"action": "setitem", "key": "status", + #RID0 will never finish running + expect_next_state_RID0 = {"action": "setitem", "key": "status", "value": "run_done", "path": [0]} - last_state_RID1 = {"action": "setitem", "key": "status", + #RID1 will never be preparing + expect_next_state_RID1 = {"action": "setitem", "key": "status", "value": "preparing", "path": [1]} + #RID2 will go through all stages so it doesn't have expect_next_state done = asyncio.Event() def notify(mod): + #Identify the rid + # Two possible location of rid, 1) "key" 2) "path"[0] if type(mod["key"]) is int: rid = mod["key"] else: @@ -173,8 +180,8 @@ def notify(mod): self.assertEqual(mod, expect[rid].pop(0)) if len(expect[rid]) == 0: self.assertEqual(rid, 2) - self.assertEqual(expect[0][0], last_state_RID0) - self.assertEqual(expect[1][0], last_state_RID1) + self.assertEqual(expect[0][0], expect_next_state_RID0) + self.assertEqual(expect[1][0], expect_next_state_RID1) done.set() scheduler.notifier.publish = notify From 04a3c73fbf7f2daac32d96af303c96c922ce2d0d Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 23 Jun 2022 10:04:35 +0800 Subject: [PATCH 08/35] Flake8 correction --- artiq/test/test_scheduler.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 7dd35ce418..789135d333 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -149,29 +149,29 @@ def test_pending_priority(self): due_date=late) expectRID2 = _get_basic_steps(2, expid_empty, middle_priority, due_date=early) - #RID0 paused because RID2 reach due_date and has higher priority + # RID0 paused because RID2 reach due_date and has higher priority expectRID0.insert(4, - {"action": "setitem", "key": "status", "value": "paused", - "path": [0]}) - #RID0 resume running after RID finish running + {"action": "setitem", "key": "status", + "value": "paused", "path": [0]}) + # RID0 resume running after RID finish running expectRID0.insert(5, - {"action": "setitem", "key": "status", "value": "running", - "path": [0]}) + {"action": "setitem", "key": "status", + "value": "running", "path": [0]}) expect = [expectRID0, expectRID1, expectRID2] - #RID0 will never finish running + # RID0 will never finish running expect_next_state_RID0 = {"action": "setitem", "key": "status", - "value": "run_done", "path": [0]} - #RID1 will never be preparing + "value": "run_done", "path": [0]} + # RID1 will never be preparing expect_next_state_RID1 = {"action": "setitem", "key": "status", - "value": "preparing", "path": [1]} - #RID2 will go through all stages so it doesn't have expect_next_state + "value": "preparing", "path": [1]} + # RID2 will go through all stages so it doesn't have expect_next_state done = asyncio.Event() def notify(mod): - #Identify the rid + # Identify the rid # Two possible location of rid, 1) "key" 2) "path"[0] if type(mod["key"]) is int: rid = mod["key"] From 6f00de068aa585ca877e13b7c5570eab389238e7 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 23 Jun 2022 15:42:09 +0800 Subject: [PATCH 09/35] Keep track of each RID and its change from pending to preparing Use schedule to record changes from mod. Mark down the order of rid which status changes from pending to preparing in pending_to_preparing. --- artiq/test/test_scheduler.py | 47 +++++++++--------------------------- 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 789135d333..049b7af743 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -6,6 +6,7 @@ from artiq.experiment import * from artiq.master.scheduler import Scheduler +from sipyco.sync_struct import process_mod class EmptyExperiment(EnvExperiment): @@ -144,44 +145,20 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - expectRID0 = _get_basic_steps(0, expid_bg, low_priority) - expectRID1 = _get_basic_steps(1, expid_empty, high_priority, - due_date=late) - expectRID2 = _get_basic_steps(2, expid_empty, middle_priority, - due_date=early) - # RID0 paused because RID2 reach due_date and has higher priority - expectRID0.insert(4, - {"action": "setitem", "key": "status", - "value": "paused", "path": [0]}) - # RID0 resume running after RID finish running - expectRID0.insert(5, - {"action": "setitem", "key": "status", - "value": "running", "path": [0]}) - - expect = [expectRID0, expectRID1, expectRID2] - - # RID0 will never finish running - expect_next_state_RID0 = {"action": "setitem", "key": "status", - "value": "run_done", "path": [0]} - # RID1 will never be preparing - expect_next_state_RID1 = {"action": "setitem", "key": "status", - "value": "preparing", "path": [1]} - # RID2 will go through all stages so it doesn't have expect_next_state - done = asyncio.Event() + schedule = {} + pending_to_preparing = [] + def notify(mod): - # Identify the rid - # Two possible location of rid, 1) "key" 2) "path"[0] - if type(mod["key"]) is int: - rid = mod["key"] - else: - rid = mod["path"][0] - self.assertEqual(mod, expect[rid].pop(0)) - if len(expect[rid]) == 0: - self.assertEqual(rid, 2) - self.assertEqual(expect[0][0], expect_next_state_RID0) - self.assertEqual(expect[1][0], expect_next_state_RID1) + process_mod(schedule, mod) + for key in schedule: + if schedule[key]["status"] == "preparing": + if key not in pending_to_preparing: + pending_to_preparing.append(key) + # Expect only two rid would go to preparing + if len(pending_to_preparing) == 2: + self.assertEqual(pending_to_preparing, [0, 2]) done.set() scheduler.notifier.publish = notify From 0488c9476ade8058f152f9452948045520c182cc Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Tue, 12 Jul 2022 15:04:56 +0800 Subject: [PATCH 10/35] test_scheduler: Skip next check after reached done state Two RID would change status at the same time when one of them entering done state (prepare_done and run_done). So once any of them enter that state, checking in next notify would be skipped. --- artiq/test/test_scheduler.py | 46 ++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 049b7af743..ac91ebd373 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -145,20 +145,46 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - done = asyncio.Event() - schedule = {} - pending_to_preparing = [] + + expect = [ + {0: "pending"}, + {0: "pending", 1: "pending"}, + {0: "pending", 1: "pending", 2: "pending"}, + {0: "preparing", 1: "pending", 2: "pending"}, + {0: "prepare_done", 1: "pending", 2: "pending"}, + {0: "running", 1: "pending", 2: "preparing"}, + {0: "running", 1: "pending", 2: "prepare_done"}, + {0: "paused", 1: "pending", 2: "running"}, + {0: "paused", 1: "pending", 2: "run_done"}, + {0: "running", 1: "pending", 2: "analyzing"}, + {0: "running", 1: "pending", 2: "deleting"}, + ] + + done = asyncio.Event() + expect_idx = 0 + skip_next = False def notify(mod): + nonlocal expect_idx, skip_next process_mod(schedule, mod) - for key in schedule: - if schedule[key]["status"] == "preparing": - if key not in pending_to_preparing: - pending_to_preparing.append(key) - # Expect only two rid would go to preparing - if len(pending_to_preparing) == 2: - self.assertEqual(pending_to_preparing, [0, 2]) + + # gather status of each RID + current_status = {} + for rid, info in schedule.items(): + current_status[rid] = info["status"] + + # skip once after prepare_done or run_done + if skip_next: + skip_next = False + else: + self.assertEqual(current_status, expect[expect_idx]) + expect_idx += 1 + if "prepare_done" in current_status.values() or\ + "run_done" in current_status.values(): + skip_next = True + + if expect_idx >= len(expect): done.set() scheduler.notifier.publish = notify From e9b1383acebb0f5d65ab01746f80d98e22696e06 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 13 Jul 2022 15:09:36 +0800 Subject: [PATCH 11/35] test_scheduler: Add SchedulerMonitor Use it in test_pending_priority check_status to see the update of schedulers is expected or not --- artiq/test/test_scheduler.py | 67 +++++++++++++++++------------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index ac91ebd373..7382948d67 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -83,6 +83,26 @@ def get(self): self._next_rid += 1 return rid +class SchedulerMonitor(): + def __init__(self, expect_status): + self.expect_status = expect_status + self.schedulers = {} + self.current_status = {} + + def check_status(self, test): + for key, value in self.schedulers.items(): + if key not in self.current_status.keys(): + self.current_status[key] = "" + if self.schedulers[key]["status"] != self.current_status[key]: + test.assertEqual(self.schedulers[key]["status"], self.expect_status[key][0]) + self.current_status[key] = self.expect_status[key].pop(0) + + def finished(self, test): + for value in self.expect_status.values(): + if value: + return False + return True + class SchedulerCase(unittest.TestCase): def setUp(self): @@ -145,46 +165,23 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - schedule = {} - - expect = [ - {0: "pending"}, - {0: "pending", 1: "pending"}, - {0: "pending", 1: "pending", 2: "pending"}, - {0: "preparing", 1: "pending", 2: "pending"}, - {0: "prepare_done", 1: "pending", 2: "pending"}, - {0: "running", 1: "pending", 2: "preparing"}, - {0: "running", 1: "pending", 2: "prepare_done"}, - {0: "paused", 1: "pending", 2: "running"}, - {0: "paused", 1: "pending", 2: "run_done"}, - {0: "running", 1: "pending", 2: "analyzing"}, - {0: "running", 1: "pending", 2: "deleting"}, - ] + expect_status = { + 0: ["pending", "preparing", "prepare_done", + "running", "paused", "running"], + 1: ["pending"], + 2: ["pending", "preparing", "prepare_done", + "running", "run_done", "analyzing", "deleting"], + } + + scheduler_mon = SchedulerMonitor(expect_status) done = asyncio.Event() - expect_idx = 0 - skip_next = False def notify(mod): - nonlocal expect_idx, skip_next - process_mod(schedule, mod) - - # gather status of each RID - current_status = {} - for rid, info in schedule.items(): - current_status[rid] = info["status"] - - # skip once after prepare_done or run_done - if skip_next: - skip_next = False - else: - self.assertEqual(current_status, expect[expect_idx]) - expect_idx += 1 - if "prepare_done" in current_status.values() or\ - "run_done" in current_status.values(): - skip_next = True + process_mod(scheduler_mon.schedulers, mod) + scheduler_mon.check_status(self) - if expect_idx >= len(expect): + if scheduler_mon.finished(self): done.set() scheduler.notifier.publish = notify From 1b06ab37994e43ebcb98b7d47868aa56ccdc4f2c Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 13 Jul 2022 16:18:05 +0800 Subject: [PATCH 12/35] test_scheduler: Rename SchedulerMonitor.schedulers to experiments --- artiq/test/test_scheduler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 7382948d67..7e90c78c14 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -86,15 +86,15 @@ def get(self): class SchedulerMonitor(): def __init__(self, expect_status): self.expect_status = expect_status - self.schedulers = {} + self.experiments = {} self.current_status = {} def check_status(self, test): - for key, value in self.schedulers.items(): + for key, value in self.experiments.items(): if key not in self.current_status.keys(): self.current_status[key] = "" - if self.schedulers[key]["status"] != self.current_status[key]: - test.assertEqual(self.schedulers[key]["status"], self.expect_status[key][0]) + if self.experiments[key]["status"] != self.current_status[key]: + test.assertEqual(self.experiments[key]["status"], self.expect_status[key][0]) self.current_status[key] = self.expect_status[key].pop(0) def finished(self, test): @@ -178,7 +178,7 @@ def test_pending_priority(self): done = asyncio.Event() def notify(mod): - process_mod(scheduler_mon.schedulers, mod) + process_mod(scheduler_mon.experiments, mod) scheduler_mon.check_status(self) if scheduler_mon.finished(self): From a148b3e77d77d393b6ff3b1f0aebbbff99e6d3dc Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 14 Jul 2022 13:08:28 +0800 Subject: [PATCH 13/35] SchedulerMonitor: Add records with time element Keep time and status change of each experiment for assert --- artiq/test/test_scheduler.py | 58 +++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 7e90c78c14..497e06b7df 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -84,25 +84,33 @@ def get(self): return rid class SchedulerMonitor(): - def __init__(self, expect_status): - self.expect_status = expect_status + def __init__(self): self.experiments = {} - self.current_status = {} + self.last_status = {} + self.records = {} + self.finished = False - def check_status(self, test): + def record(self): for key, value in self.experiments.items(): - if key not in self.current_status.keys(): - self.current_status[key] = "" - if self.experiments[key]["status"] != self.current_status[key]: - test.assertEqual(self.experiments[key]["status"], self.expect_status[key][0]) - self.current_status[key] = self.expect_status[key].pop(0) - - def finished(self, test): - for value in self.expect_status.values(): - if value: - return False - return True - + if key not in self.last_status.keys(): + self.last_status[key] = "" + self.records[key] = [] + current_status = self.experiments[key]["status"] + if current_status != self.last_status[key]: + self.last_status[key] = current_status + self.records[key].append(time()) + self.records[key].append(current_status) + if current_status == "deleting": + self.finished = True + + def get_in_time(self, rid, status): + return self.records[rid][self.records[rid].index(status)-1] + + def get_out_time(self, rid, status) + if self.records[rid][-1] == status: + return "never" + else: + return self.records[rid][self.records[rid].index(status) + 1] class SchedulerCase(unittest.TestCase): def setUp(self): @@ -165,23 +173,15 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - expect_status = { - 0: ["pending", "preparing", "prepare_done", - "running", "paused", "running"], - 1: ["pending"], - 2: ["pending", "preparing", "prepare_done", - "running", "run_done", "analyzing", "deleting"], - } - - scheduler_mon = SchedulerMonitor(expect_status) + scheduler_mon = SchedulerMonitor() done = asyncio.Event() def notify(mod): process_mod(scheduler_mon.experiments, mod) - scheduler_mon.check_status(self) + scheduler_mon.record() - if scheduler_mon.finished(self): + if scheduler_mon.finished: done.set() scheduler.notifier.publish = notify @@ -196,6 +196,10 @@ def notify(mod): scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) + # Assert + self.assertTrue(scheduler_mon.get_out_time(1, "pending") == "never") + self.assertTrue(scheduler_mon.get_out_time(2, "pending") >= early) + def test_pause(self): loop = self.loop From 0b91f5334919f50f5f5b082c03580abb83c55d9a Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 14 Jul 2022 13:26:19 +0800 Subject: [PATCH 14/35] Add missing colon --- artiq/test/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 497e06b7df..75d2400164 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -106,7 +106,7 @@ def record(self): def get_in_time(self, rid, status): return self.records[rid][self.records[rid].index(status)-1] - def get_out_time(self, rid, status) + def get_out_time(self, rid, status): if self.records[rid][-1] == status: return "never" else: From 99ffd597f0153fd41ba3e2b08285d1facfb72c5b Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 14 Jul 2022 15:25:12 +0800 Subject: [PATCH 15/35] SchedulerMonitor: Add status_record and related assert --- artiq/test/test_scheduler.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 75d2400164..6c87a15312 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -88,6 +88,16 @@ def __init__(self): self.experiments = {} self.last_status = {} self.records = {} + self.status_records = { + "pending": [], + "preparing": [], + "prepare_done": [], + "running": [], + "run_done": [], + "analyzing": [], + "deleting": [], + "paused": [] + } self.finished = False def record(self): @@ -100,6 +110,7 @@ def record(self): self.last_status[key] = current_status self.records[key].append(time()) self.records[key].append(current_status) + self.status_records[current_status].append(key) if current_status == "deleting": self.finished = True @@ -112,6 +123,10 @@ def get_out_time(self, rid, status): else: return self.records[rid][self.records[rid].index(status) + 1] + def get_exp_order(self, status): + return self.status_records[status] + + class SchedulerCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() @@ -125,6 +140,7 @@ def test_steps(self): expect = _get_basic_steps(1, expid) done = asyncio.Event() expect_idx = 0 + def notify(mod): nonlocal expect_idx self.assertEqual(mod, expect[expect_idx]) @@ -197,8 +213,8 @@ def notify(mod): loop.run_until_complete(scheduler.stop()) # Assert - self.assertTrue(scheduler_mon.get_out_time(1, "pending") == "never") - self.assertTrue(scheduler_mon.get_out_time(2, "pending") >= early) + self.assertEqual(scheduler_mon.get_exp_order("preparing"), [0, 2]) + self.assertEqual(scheduler_mon.get_out_time(1, "pending"), "never") def test_pause(self): loop = self.loop From c5f97a05606b10082c1f464d6173fd8e06da7f96 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 14 Jul 2022 15:26:52 +0800 Subject: [PATCH 16/35] Rename scheduler_mon and records --- artiq/test/test_scheduler.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 6c87a15312..7f4f39a233 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -87,7 +87,7 @@ class SchedulerMonitor(): def __init__(self): self.experiments = {} self.last_status = {} - self.records = {} + self.exp_flow = {} self.status_records = { "pending": [], "preparing": [], @@ -104,24 +104,24 @@ def record(self): for key, value in self.experiments.items(): if key not in self.last_status.keys(): self.last_status[key] = "" - self.records[key] = [] + self.exp_flow[key] = [] current_status = self.experiments[key]["status"] if current_status != self.last_status[key]: self.last_status[key] = current_status - self.records[key].append(time()) - self.records[key].append(current_status) + self.exp_flow[key].append(time()) + self.exp_flow[key].append(current_status) self.status_records[current_status].append(key) if current_status == "deleting": self.finished = True def get_in_time(self, rid, status): - return self.records[rid][self.records[rid].index(status)-1] + return self.exp_flow[rid][self.exp_flow[rid].index(status)-1] def get_out_time(self, rid, status): - if self.records[rid][-1] == status: + if self.exp_flow[rid][-1] == status: return "never" else: - return self.records[rid][self.records[rid].index(status) + 1] + return self.exp_flow[rid][self.exp_flow[rid].index(status) + 1] def get_exp_order(self, status): return self.status_records[status] @@ -189,15 +189,15 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - scheduler_mon = SchedulerMonitor() + monitor = SchedulerMonitor() done = asyncio.Event() def notify(mod): - process_mod(scheduler_mon.experiments, mod) - scheduler_mon.record() + process_mod(monitor.experiments, mod) + monitor.record() - if scheduler_mon.finished: + if monitor.finished: done.set() scheduler.notifier.publish = notify @@ -213,8 +213,8 @@ def notify(mod): loop.run_until_complete(scheduler.stop()) # Assert - self.assertEqual(scheduler_mon.get_exp_order("preparing"), [0, 2]) - self.assertEqual(scheduler_mon.get_out_time(1, "pending"), "never") + self.assertEqual(monitor.get_exp_order("preparing"), [0, 2]) + self.assertEqual(monitor.get_out_time(1, "pending"), "never") def test_pause(self): loop = self.loop From b52d9fa9f0ff9167b6aa36ea2cf56c8cf90a6d53 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 14 Jul 2022 16:04:12 +0800 Subject: [PATCH 17/35] SchedulerMonitor: Add get_status_order it return a list of status the rid went through --- artiq/test/test_scheduler.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 7f4f39a233..1d7d6a3180 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -126,6 +126,13 @@ def get_out_time(self, rid, status): def get_exp_order(self, status): return self.status_records[status] + def get_status_order(self, rid): + result_list = self.exp_flow[rid].copy() + for item in result_list: + if isinstance(item, float): + result_list.remove(item) + return result_list + class SchedulerCase(unittest.TestCase): def setUp(self): @@ -214,7 +221,12 @@ def notify(mod): # Assert self.assertEqual(monitor.get_exp_order("preparing"), [0, 2]) - self.assertEqual(monitor.get_out_time(1, "pending"), "never") + self.assertEqual(monitor.get_out_time(1, "pending"), "never", + "RID 1 has left pending") + basic_flow = ["pending", "preparing", "prepare_done", "running", + "run_done", "analyzing", "deleting"] + self.assertEqual(monitor.get_status_order(2), basic_flow, + "RID 2 did not go through all stage") def test_pause(self): loop = self.loop From 6c42c465aadccca31c1d471c78406da272cabfa4 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 14 Jul 2022 16:51:19 +0800 Subject: [PATCH 18/35] SchedulerMonitor: Add end_condition args --- artiq/test/test_scheduler.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 1d7d6a3180..0607a47df2 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -8,6 +8,8 @@ from artiq.master.scheduler import Scheduler from sipyco.sync_struct import process_mod +basic_flow = ["pending", "preparing", "prepare_done", "running", + "run_done", "analyzing", "deleting"] class EmptyExperiment(EnvExperiment): def build(self): @@ -84,7 +86,7 @@ def get(self): return rid class SchedulerMonitor(): - def __init__(self): + def __init__(self, end_condition = "deleting"): self.experiments = {} self.last_status = {} self.exp_flow = {} @@ -99,6 +101,7 @@ def __init__(self): "paused": [] } self.finished = False + self.end_condition = end_condition def record(self): for key, value in self.experiments.items(): @@ -111,8 +114,10 @@ def record(self): self.exp_flow[key].append(time()) self.exp_flow[key].append(current_status) self.status_records[current_status].append(key) - if current_status == "deleting": - self.finished = True + + if current_status == self.end_condition: + self.finished = True + return def get_in_time(self, rid, status): return self.exp_flow[rid][self.exp_flow[rid].index(status)-1] From 6c065126941c6a8a3eda4bfc8719b5f36d1131d8 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 15 Jul 2022 16:52:55 +0800 Subject: [PATCH 19/35] Remove () after SchedulerMonitor --- artiq/test/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 0607a47df2..d5ec66686c 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -85,7 +85,7 @@ def get(self): self._next_rid += 1 return rid -class SchedulerMonitor(): +class SchedulerMonitor: def __init__(self, end_condition = "deleting"): self.experiments = {} self.last_status = {} From f8f0d3480a094924cb20014484ad1c8e0bdf54f1 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 15 Jul 2022 17:29:28 +0800 Subject: [PATCH 20/35] SchedulerMonitor: Structure exp_flow --- artiq/test/test_scheduler.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index d5ec66686c..91814ffa5b 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -111,8 +111,13 @@ def record(self): current_status = self.experiments[key]["status"] if current_status != self.last_status[key]: self.last_status[key] = current_status - self.exp_flow[key].append(time()) - self.exp_flow[key].append(current_status) + if self.exp_flow[key]: + self.exp_flow[key][-1]["out_time"] = time() + self.exp_flow[key].append({ + "status": current_status, + "in_time": time(), + "out_time": "never" + }) self.status_records[current_status].append(key) if current_status == self.end_condition: @@ -120,23 +125,20 @@ def record(self): return def get_in_time(self, rid, status): - return self.exp_flow[rid][self.exp_flow[rid].index(status)-1] + for step in self.exp_flow[rid]: + if step["status"] == status: + return step["in_time"] def get_out_time(self, rid, status): - if self.exp_flow[rid][-1] == status: - return "never" - else: - return self.exp_flow[rid][self.exp_flow[rid].index(status) + 1] + for step in self.exp_flow[rid]: + if step["status"] == status: + return step["out_time"] def get_exp_order(self, status): return self.status_records[status] def get_status_order(self, rid): - result_list = self.exp_flow[rid].copy() - for item in result_list: - if isinstance(item, float): - result_list.remove(item) - return result_list + return [step["status"] for step in self.exp_flow[rid]] class SchedulerCase(unittest.TestCase): From 8c2501f6ab9f4524e805c178a421bef458c84efe Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Mon, 18 Jul 2022 14:44:58 +0800 Subject: [PATCH 21/35] SchedulerMonitor: Add wait_until() and flags Target a particular status of a rid and set the asyncio.Event() --- artiq/test/test_scheduler.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 91814ffa5b..d5e4537d93 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -99,9 +99,10 @@ def __init__(self, end_condition = "deleting"): "analyzing": [], "deleting": [], "paused": [] - } + } self.finished = False self.end_condition = end_condition + self.flags = {"arrive": {}, "leave": {}} def record(self): for key, value in self.experiments.items(): @@ -120,6 +121,15 @@ def record(self): }) self.status_records[current_status].append(key) + if key in self.flags["arrive"].keys(): + if current_status in self.flags["arrive"][key].keys(): + self.flags["arrive"][key][current_status].set() + self.remove_flag(key, "arrive", current_status) + if key in self.flags["leave"].keys(): + if self.last_status[key] in self.flags["leave"][key].keys(): + self.flags["leave"][key][current_status].set() + self.remove_flag(key, "leave", current_status) + if current_status == self.end_condition: self.finished = True return @@ -140,6 +150,24 @@ def get_exp_order(self, status): def get_status_order(self, rid): return [step["status"] for step in self.exp_flow[rid]] + async def wait_until(self, rid, condition, status): + # condition : "arrive", "leave" + if self.last_status[rid] == status: + return + if rid not in self.flags[condition] or\ + status not in self.flags[condition][rid]: + self.add_flag(rid, condition, status) + await self.flags[condition][rid][status].wait() + + def add_flag(self, rid, condition, status): + if rid not in self.flags[condition]: + self.flags[condition][rid] = {} + self.flags[condition][rid][status] = asyncio.Event() + + def remove_flag(self, rid, condition, status): + del self.flags[condition][rid][status] + if not self.flags[condition][rid]: + del self.flags[condition][rid] class SchedulerCase(unittest.TestCase): def setUp(self): From bdad183a120966bb492d96f06056efd06502ceec Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Mon, 18 Jul 2022 14:55:53 +0800 Subject: [PATCH 22/35] test_pending_priority: Replace done with monitor.wait_until --- artiq/test/test_scheduler.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index d5e4537d93..cf8c4b977f 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -233,15 +233,10 @@ def test_pending_priority(self): monitor = SchedulerMonitor() - done = asyncio.Event() - def notify(mod): process_mod(monitor.experiments, mod) monitor.record() - if monitor.finished: - done.set() - scheduler.notifier.publish = notify scheduler.start() @@ -250,7 +245,7 @@ def notify(mod): scheduler.submit("main", expid_empty, high_priority, late) scheduler.submit("main", expid_empty, middle_priority, early) - loop.run_until_complete(done.wait()) + loop.run_until_complete(monitor.wait_until(2, "arrive", "deleting")) scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) @@ -258,10 +253,6 @@ def notify(mod): self.assertEqual(monitor.get_exp_order("preparing"), [0, 2]) self.assertEqual(monitor.get_out_time(1, "pending"), "never", "RID 1 has left pending") - basic_flow = ["pending", "preparing", "prepare_done", "running", - "run_done", "analyzing", "deleting"] - self.assertEqual(monitor.get_status_order(2), basic_flow, - "RID 2 did not go through all stage") def test_pause(self): loop = self.loop From d7630a9451e5711ca8698e4e1c9ea7dc71e967bd Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Mon, 18 Jul 2022 17:52:51 +0800 Subject: [PATCH 23/35] test_scheduler: Use monitor.wait_until and assert with basic_flow --- artiq/test/test_scheduler.py | 121 +++++++++-------------------------- 1 file changed, 32 insertions(+), 89 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index cf8c4b977f..4592c22528 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -98,7 +98,8 @@ def __init__(self, end_condition = "deleting"): "run_done": [], "analyzing": [], "deleting": [], - "paused": [] + "paused": [], + "flushing": [] } self.finished = False self.end_condition = end_condition @@ -178,17 +179,11 @@ def test_steps(self): loop = self.loop scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") - - expect = _get_basic_steps(1, expid) - done = asyncio.Event() - expect_idx = 0 + monitor = SchedulerMonitor() def notify(mod): - nonlocal expect_idx - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() + process_mod(monitor.experiments, mod) + monitor.record() scheduler.notifier.publish = notify scheduler.start() @@ -196,18 +191,14 @@ def notify(mod): # Verify that a timed experiment far in the future does not # get run, even if it has high priority. late = time() + 100000 - expect.insert(0, - {"action": "setitem", "key": 0, "value": - {"pipeline": "main", "status": "pending", "priority": 99, - "expid": expid, "due_date": late, "flush": False, - "repo_msg": None}, - "path": []}) scheduler.submit("main", expid, 99, late, False) # This one (RID 1) gets run instead. scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(done.wait()) + loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) + self.assertEqual(monitor.get_status_order(1), basic_flow) + self.assertEqual(monitor.last_status[0], "pending") scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) @@ -273,52 +264,29 @@ def check_termination(mod): expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") - expect = _get_basic_steps(1, expid) - background_running = asyncio.Event() - empty_ready = asyncio.Event() - empty_completed = asyncio.Event() - background_completed = asyncio.Event() - expect_idx = 0 + monitor = SchedulerMonitor() def notify(mod): - nonlocal expect_idx - if mod == {"path": [0], - "value": "running", - "key": "status", - "action": "setitem"}: - background_running.set() - if mod == {"path": [0], - "value": "deleting", - "key": "status", - "action": "setitem"}: - background_completed.set() - if mod == {"path": [1], - "value": "prepare_done", - "key": "status", - "action": "setitem"}: - empty_ready.set() - if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - empty_completed.set() + process_mod(monitor.experiments, mod) + monitor.record() scheduler.notifier.publish = notify scheduler.start() scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(background_running.wait()) + loop.run_until_complete(monitor.wait_until(0, "arrive", "running")) self.assertFalse(scheduler.check_pause(0)) scheduler.submit("main", expid, 0, None, False) self.assertFalse(scheduler.check_pause(0)) - loop.run_until_complete(empty_ready.wait()) + loop.run_until_complete(monitor.wait_until(1, "arrive", "prepare_done")) self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(empty_completed.wait()) + loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) self.assertFalse(scheduler.check_pause(0)) self.assertFalse(termination_ok) scheduler.request_termination(0) self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(background_completed.wait()) + loop.run_until_complete(monitor.wait_until(0, "arrive", "deleting")) self.assertTrue(termination_ok) + self.assertEqual(monitor.get_status_order(1), basic_flow) loop.run_until_complete(scheduler.stop()) @@ -331,35 +299,21 @@ def test_close_with_active_runs(self): expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. expid_bg["log_level"] = logging.CRITICAL + monitor = SchedulerMonitor() expid = _get_expid("EmptyExperiment") - background_running = asyncio.Event() - empty_ready = asyncio.Event() - background_completed = asyncio.Event() def notify(mod): - if mod == {"path": [0], - "value": "running", - "key": "status", - "action": "setitem"}: - background_running.set() - if mod == {"path": [0], - "value": "deleting", - "key": "status", - "action": "setitem"}: - background_completed.set() - if mod == {"path": [1], - "value": "prepare_done", - "key": "status", - "action": "setitem"}: - empty_ready.set() + process_mod(monitor.experiments, mod) + monitor.record() + scheduler.notifier.publish = notify scheduler.start() scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(background_running.wait()) + loop.run_until_complete(monitor.wait_until(0, "arrive", "running")) scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(empty_ready.wait()) + loop.run_until_complete(monitor.wait_until(1, "arrive", "prepare_done")) # At this point, (at least) BackgroundExperiment is still running; make # sure we can stop the scheduler without hanging. @@ -369,34 +323,23 @@ def test_flush(self): loop = self.loop scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") + monitor = SchedulerMonitor() + + expect_flow = basic_flow.copy() + expect_flow.insert(1, "flushing") - expect = _get_basic_steps(1, expid, 1, True) - expect.insert(1, {"key": "status", - "path": [1], - "value": "flushing", - "action": "setitem"}) - first_preparing = asyncio.Event() - done = asyncio.Event() - expect_idx = 0 def notify(mod): - nonlocal expect_idx - if mod == {"path": [0], - "value": "preparing", - "key": "status", - "action": "setitem"}: - first_preparing.set() - if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() + process_mod(monitor.experiments, mod) + monitor.record() + scheduler.notifier.publish = notify scheduler.start() scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(first_preparing.wait()) + loop.run_until_complete(monitor.wait_until(0, "arrive", "prepare_done")) scheduler.submit("main", expid, 1, None, True) - loop.run_until_complete(done.wait()) + loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) + self.assertEqual(monitor.get_status_order(1), expect_flow) loop.run_until_complete(scheduler.stop()) def tearDown(self): From f9bce0c7f1ed892b7b9bd87089f49b9fe3740694 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 20 Jul 2022 10:07:53 +0800 Subject: [PATCH 24/35] test_scheduler: remove _get_basic_steps It has been replaced by basic_flow --- artiq/test/test_scheduler.py | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 4592c22528..e9ac406ca5 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -53,29 +53,6 @@ def _get_expid(name): } -def _get_basic_steps(rid, expid, priority=0, flush=False, due_date=None): - return [ - {"action": "setitem", "key": rid, "value": - {"pipeline": "main", "status": "pending", "priority": priority, - "expid": expid, "due_date": due_date, "flush": flush, - "repo_msg": None}, - "path": []}, - {"action": "setitem", "key": "status", "value": "preparing", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "prepare_done", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "running", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "run_done", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "analyzing", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "deleting", - "path": [rid]}, - {"action": "delitem", "key": rid, "path": []} - ] - - class _RIDCounter: def __init__(self, next_rid): self._next_rid = next_rid @@ -159,7 +136,7 @@ async def wait_until(self, rid, condition, status): status not in self.flags[condition][rid]: self.add_flag(rid, condition, status) await self.flags[condition][rid][status].wait() - + def add_flag(self, rid, condition, status): if rid not in self.flags[condition]: self.flags[condition][rid] = {} @@ -324,7 +301,7 @@ def test_flush(self): scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") monitor = SchedulerMonitor() - + expect_flow = basic_flow.copy() expect_flow.insert(1, "flushing") From 566909d847fb1eb4bb49ad2fd55e3581fd6ef2a5 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Wed, 20 Jul 2022 18:27:19 +0800 Subject: [PATCH 25/35] test_flush: test with different priority case --- artiq/test/test_scheduler.py | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index e9ac406ca5..88b0db3be1 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -9,7 +9,10 @@ from sipyco.sync_struct import process_mod basic_flow = ["pending", "preparing", "prepare_done", "running", - "run_done", "analyzing", "deleting"] + "run_done", "analyzing", "deleting"] + +flush_flow = ["pending", "flushing", "preparing", "prepare_done", + "running", "run_done", "analyzing", "deleting"] class EmptyExperiment(EnvExperiment): def build(self): @@ -63,7 +66,7 @@ def get(self): return rid class SchedulerMonitor: - def __init__(self, end_condition = "deleting"): + def __init__(self, end_condition="deleting"): self.experiments = {} self.last_status = {} self.exp_flow = {} @@ -298,13 +301,14 @@ def notify(mod): def test_flush(self): loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None, None) + handlers = {} + scheduler = Scheduler(_RIDCounter(0), handlers, None, None) + handlers["scheduler_check_pause"] = scheduler.check_pause expid = _get_expid("EmptyExperiment") + expid_bg = _get_expid("CheckPauseBackgroundExperiment") + expid_bg["log_level"] = logging.CRITICAL monitor = SchedulerMonitor() - expect_flow = basic_flow.copy() - expect_flow.insert(1, "flushing") - def notify(mod): process_mod(monitor.experiments, mod) monitor.record() @@ -312,11 +316,24 @@ def notify(mod): scheduler.notifier.publish = notify scheduler.start() + # Flush with same priority scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(monitor.wait_until(0, "arrive", "prepare_done")) - scheduler.submit("main", expid, 1, None, True) + scheduler.submit("main", expid, 0, None, True) + loop.run_until_complete(monitor.wait_until(1, "arrive", "preparing")) + self.assertEqual(monitor.last_status[0], "deleting") loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) - self.assertEqual(monitor.get_status_order(1), expect_flow) + self.assertEqual(monitor.get_status_order(0), basic_flow) + self.assertEqual(monitor.get_status_order(1), flush_flow) + + # Flush with higher priority + scheduler.submit("main", expid_bg, 0, None, False) + # Make sure RID 2 go into preparing stage first + loop.run_until_complete(monitor.wait_until(2, "arrive", "preparing")) + scheduler.submit("main", expid, 1, None, True) + loop.run_until_complete(monitor.wait_until(3, "arrive", "deleting")) + self.assertEqual(monitor.last_status[2], "running") + self.assertEqual(monitor.get_status_order(3), flush_flow) + loop.run_until_complete(scheduler.stop()) def tearDown(self): From 4357546f950464cb1d18dff0c2c3c62f2a13e30d Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 21 Jul 2022 10:45:11 +0800 Subject: [PATCH 26/35] test_pause: Add comments and assert before request termination --- artiq/test/test_scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 88b0db3be1..160e3042df 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -251,6 +251,7 @@ def notify(mod): scheduler.notifier.publish = notify scheduler.start() + # check_pause is True when rid with higher priority is prepare_done scheduler.submit("main", expid_bg, -99, None, False) loop.run_until_complete(monitor.wait_until(0, "arrive", "running")) self.assertFalse(scheduler.check_pause(0)) @@ -260,13 +261,15 @@ def notify(mod): self.assertTrue(scheduler.check_pause(0)) loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) self.assertFalse(scheduler.check_pause(0)) + self.assertEqual(monitor.get_status_order(1), basic_flow) + # check_pause is True when request_termination is called self.assertFalse(termination_ok) + self.assertFalse(scheduler.check_pause(0)) scheduler.request_termination(0) self.assertTrue(scheduler.check_pause(0)) loop.run_until_complete(monitor.wait_until(0, "arrive", "deleting")) self.assertTrue(termination_ok) - self.assertEqual(monitor.get_status_order(1), basic_flow) loop.run_until_complete(scheduler.stop()) From c06a0908d78283120ef4cf17ed2b4056dccf69b5 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Thu, 21 Jul 2022 13:27:50 +0800 Subject: [PATCH 27/35] test_pending: Use asyncio.wait and update monitor asyncio.wait to wait either one rid leave pending stage Update wait_until and record in monitor for successful callback from leave stage --- artiq/test/test_scheduler.py | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 160e3042df..ac54204c9e 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -92,7 +92,6 @@ def record(self): self.exp_flow[key] = [] current_status = self.experiments[key]["status"] if current_status != self.last_status[key]: - self.last_status[key] = current_status if self.exp_flow[key]: self.exp_flow[key][-1]["out_time"] = time() self.exp_flow[key].append({ @@ -105,14 +104,13 @@ def record(self): if key in self.flags["arrive"].keys(): if current_status in self.flags["arrive"][key].keys(): self.flags["arrive"][key][current_status].set() - self.remove_flag(key, "arrive", current_status) if key in self.flags["leave"].keys(): if self.last_status[key] in self.flags["leave"][key].keys(): - self.flags["leave"][key][current_status].set() - self.remove_flag(key, "leave", current_status) + self.flags["leave"][key][self.last_status[key]].set() if current_status == self.end_condition: self.finished = True + self.last_status[key] = current_status return def get_in_time(self, rid, status): @@ -133,12 +131,13 @@ def get_status_order(self, rid): async def wait_until(self, rid, condition, status): # condition : "arrive", "leave" - if self.last_status[rid] == status: + if self.last_status[rid] == status and condition == "arrive": return if rid not in self.flags[condition] or\ - status not in self.flags[condition][rid]: + status not in self.flags[condition][rid]: self.add_flag(rid, condition, status) await self.flags[condition][rid][status].wait() + self.remove_flag(rid, condition, status) def add_flag(self, rid, condition, status): if rid not in self.flags[condition]: @@ -146,9 +145,11 @@ def add_flag(self, rid, condition, status): self.flags[condition][rid][status] = asyncio.Event() def remove_flag(self, rid, condition, status): - del self.flags[condition][rid][status] - if not self.flags[condition][rid]: - del self.flags[condition][rid] + if rid in self.flags[condition].keys(): + if status in self.flags[condition][rid].keys(): + del self.flags[condition][rid][status] + if not self.flags[condition][rid]: + del self.flags[condition][rid] class SchedulerCase(unittest.TestCase): def setUp(self): @@ -216,15 +217,22 @@ def notify(mod): scheduler.submit("main", expid_empty, high_priority, late) scheduler.submit("main", expid_empty, middle_priority, early) + wait_RID1_leave = loop.create_task( + monitor.wait_until(1, "leave", "pending")) + wait_RID2_leave = loop.create_task( + monitor.wait_until(2, "leave", "pending")) + done, pending = loop.run_until_complete(asyncio.wait( + [wait_RID1_leave, wait_RID2_leave], + return_when=asyncio.FIRST_COMPLETED)) + self.assertIn(wait_RID2_leave, done) + for task in pending: + task.cancel() + loop.run_until_complete(monitor.wait_until(2, "arrive", "deleting")) + self.assertEqual(monitor.get_status_order(2), basic_flow) scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) - # Assert - self.assertEqual(monitor.get_exp_order("preparing"), [0, 2]) - self.assertEqual(monitor.get_out_time(1, "pending"), "never", - "RID 1 has left pending") - def test_pause(self): loop = self.loop From cd0465b7dbb4c8ee71bfbaf1610f938fb07a6604 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 29 Jul 2022 11:05:11 +0800 Subject: [PATCH 28/35] Scheduler: Remove status_records, get_in_time and get_out_time status_records: Information are stored but never used. Plus it can be computed from exp_flow afterward get_time functions are never used. --- artiq/test/test_scheduler.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index ac54204c9e..96d7beb14e 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -70,17 +70,6 @@ def __init__(self, end_condition="deleting"): self.experiments = {} self.last_status = {} self.exp_flow = {} - self.status_records = { - "pending": [], - "preparing": [], - "prepare_done": [], - "running": [], - "run_done": [], - "analyzing": [], - "deleting": [], - "paused": [], - "flushing": [] - } self.finished = False self.end_condition = end_condition self.flags = {"arrive": {}, "leave": {}} @@ -99,7 +88,6 @@ def record(self): "in_time": time(), "out_time": "never" }) - self.status_records[current_status].append(key) if key in self.flags["arrive"].keys(): if current_status in self.flags["arrive"][key].keys(): @@ -113,19 +101,6 @@ def record(self): self.last_status[key] = current_status return - def get_in_time(self, rid, status): - for step in self.exp_flow[rid]: - if step["status"] == status: - return step["in_time"] - - def get_out_time(self, rid, status): - for step in self.exp_flow[rid]: - if step["status"] == status: - return step["out_time"] - - def get_exp_order(self, status): - return self.status_records[status] - def get_status_order(self, rid): return [step["status"] for step in self.exp_flow[rid]] From ada6d2734a4b7b6339e1d009b5024289f49ee13b Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 29 Jul 2022 11:23:56 +0800 Subject: [PATCH 29/35] scheduler: Remove end_condition and bool finished It was introduced as a condition to end scheduler which is no longer necessary --- artiq/test/test_scheduler.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 96d7beb14e..038ad9514e 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -66,12 +66,10 @@ def get(self): return rid class SchedulerMonitor: - def __init__(self, end_condition="deleting"): + def __init__(self): self.experiments = {} self.last_status = {} self.exp_flow = {} - self.finished = False - self.end_condition = end_condition self.flags = {"arrive": {}, "leave": {}} def record(self): @@ -96,8 +94,6 @@ def record(self): if self.last_status[key] in self.flags["leave"][key].keys(): self.flags["leave"][key][self.last_status[key]].set() - if current_status == self.end_condition: - self.finished = True self.last_status[key] = current_status return From 0efa3dbb2cf3d1cabb09d7fdd6cf56c6eb433b24 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 29 Jul 2022 11:37:35 +0800 Subject: [PATCH 30/35] scheduler: Replace notify by monitor.record --- artiq/test/test_scheduler.py | 38 +++++++----------------------------- 1 file changed, 7 insertions(+), 31 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 038ad9514e..cc3860921c 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -72,7 +72,8 @@ def __init__(self): self.exp_flow = {} self.flags = {"arrive": {}, "leave": {}} - def record(self): + def record(self, mod): + process_mod(self.experiments, mod) for key, value in self.experiments.items(): if key not in self.last_status.keys(): self.last_status[key] = "" @@ -132,12 +133,7 @@ def test_steps(self): scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") monitor = SchedulerMonitor() - - def notify(mod): - process_mod(monitor.experiments, mod) - monitor.record() - scheduler.notifier.publish = notify - + scheduler.notifier.publish = monitor.record scheduler.start() # Verify that a timed experiment far in the future does not @@ -175,13 +171,7 @@ def test_pending_priority(self): early = time() + 1 monitor = SchedulerMonitor() - - def notify(mod): - process_mod(monitor.experiments, mod) - monitor.record() - - scheduler.notifier.publish = notify - + scheduler.notifier.publish = monitor.record scheduler.start() scheduler.submit("main", expid_bg, low_priority) @@ -224,11 +214,7 @@ def check_termination(mod): expid = _get_expid("EmptyExperiment") monitor = SchedulerMonitor() - def notify(mod): - process_mod(monitor.experiments, mod) - monitor.record() - scheduler.notifier.publish = notify - + scheduler.notifier.publish = monitor.record scheduler.start() # check_pause is True when rid with higher priority is prepare_done scheduler.submit("main", expid_bg, -99, None, False) @@ -264,12 +250,7 @@ def test_close_with_active_runs(self): monitor = SchedulerMonitor() expid = _get_expid("EmptyExperiment") - def notify(mod): - process_mod(monitor.experiments, mod) - monitor.record() - - scheduler.notifier.publish = notify - + scheduler.notifier.publish = monitor.record scheduler.start() scheduler.submit("main", expid_bg, -99, None, False) loop.run_until_complete(monitor.wait_until(0, "arrive", "running")) @@ -291,12 +272,7 @@ def test_flush(self): expid_bg["log_level"] = logging.CRITICAL monitor = SchedulerMonitor() - def notify(mod): - process_mod(monitor.experiments, mod) - monitor.record() - - scheduler.notifier.publish = notify - + scheduler.notifier.publish = monitor.record scheduler.start() # Flush with same priority scheduler.submit("main", expid, 0, None, False) From ee7d6b0a365be27e6e27b1a937428bcabc96d64f Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 29 Jul 2022 15:19:32 +0800 Subject: [PATCH 31/35] SchedulerMonitor: Apply flow check in record --- artiq/test/test_scheduler.py | 52 +++++++++++++++--------------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index cc3860921c..dd1d95cde2 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -8,11 +8,6 @@ from artiq.master.scheduler import Scheduler from sipyco.sync_struct import process_mod -basic_flow = ["pending", "preparing", "prepare_done", "running", - "run_done", "analyzing", "deleting"] - -flush_flow = ["pending", "flushing", "preparing", "prepare_done", - "running", "run_done", "analyzing", "deleting"] class EmptyExperiment(EnvExperiment): def build(self): @@ -66,27 +61,32 @@ def get(self): return rid class SchedulerMonitor: - def __init__(self): + def __init__(self, test): + self.test = test self.experiments = {} self.last_status = {} - self.exp_flow = {} self.flags = {"arrive": {}, "leave": {}} + self.flow_map = { # current status -> possible move + "": {"pending"}, + "pending": {"preparing", "flushing", "deleting"}, + "preparing": {"prepare_done", "deleting"}, + "prepare_done": {"running", "deleting"}, + "running": {"run_done", "paused", "deleting"}, + "run_done": {"analyzing", "deleting"}, + "analyzing": {"deleting"}, + "deleting": {}, + "paused": {"running"}, + "flushing": {"preparing"} + } def record(self, mod): process_mod(self.experiments, mod) for key, value in self.experiments.items(): if key not in self.last_status.keys(): self.last_status[key] = "" - self.exp_flow[key] = [] current_status = self.experiments[key]["status"] if current_status != self.last_status[key]: - if self.exp_flow[key]: - self.exp_flow[key][-1]["out_time"] = time() - self.exp_flow[key].append({ - "status": current_status, - "in_time": time(), - "out_time": "never" - }) + self.test.assertIn(current_status, self.flow_map[self.last_status[key]]) if key in self.flags["arrive"].keys(): if current_status in self.flags["arrive"][key].keys(): @@ -98,9 +98,6 @@ def record(self, mod): self.last_status[key] = current_status return - def get_status_order(self, rid): - return [step["status"] for step in self.exp_flow[rid]] - async def wait_until(self, rid, condition, status): # condition : "arrive", "leave" if self.last_status[rid] == status and condition == "arrive": @@ -123,6 +120,7 @@ def remove_flag(self, rid, condition, status): if not self.flags[condition][rid]: del self.flags[condition][rid] + class SchedulerCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() @@ -132,7 +130,7 @@ def test_steps(self): loop = self.loop scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() @@ -145,9 +143,7 @@ def test_steps(self): scheduler.submit("main", expid, 0, None, False) loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) - self.assertEqual(monitor.get_status_order(1), basic_flow) self.assertEqual(monitor.last_status[0], "pending") - scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) def test_pending_priority(self): @@ -170,7 +166,7 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() @@ -190,8 +186,6 @@ def test_pending_priority(self): task.cancel() loop.run_until_complete(monitor.wait_until(2, "arrive", "deleting")) - self.assertEqual(monitor.get_status_order(2), basic_flow) - scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) def test_pause(self): @@ -213,7 +207,7 @@ def check_termination(mod): expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() # check_pause is True when rid with higher priority is prepare_done @@ -226,7 +220,6 @@ def check_termination(mod): self.assertTrue(scheduler.check_pause(0)) loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) self.assertFalse(scheduler.check_pause(0)) - self.assertEqual(monitor.get_status_order(1), basic_flow) # check_pause is True when request_termination is called self.assertFalse(termination_ok) @@ -247,7 +240,7 @@ def test_close_with_active_runs(self): expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. expid_bg["log_level"] = logging.CRITICAL - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) expid = _get_expid("EmptyExperiment") scheduler.notifier.publish = monitor.record @@ -270,7 +263,7 @@ def test_flush(self): expid = _get_expid("EmptyExperiment") expid_bg = _get_expid("CheckPauseBackgroundExperiment") expid_bg["log_level"] = logging.CRITICAL - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() @@ -280,8 +273,6 @@ def test_flush(self): loop.run_until_complete(monitor.wait_until(1, "arrive", "preparing")) self.assertEqual(monitor.last_status[0], "deleting") loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) - self.assertEqual(monitor.get_status_order(0), basic_flow) - self.assertEqual(monitor.get_status_order(1), flush_flow) # Flush with higher priority scheduler.submit("main", expid_bg, 0, None, False) @@ -290,7 +281,6 @@ def test_flush(self): scheduler.submit("main", expid, 1, None, True) loop.run_until_complete(monitor.wait_until(3, "arrive", "deleting")) self.assertEqual(monitor.last_status[2], "running") - self.assertEqual(monitor.get_status_order(3), flush_flow) loop.run_until_complete(scheduler.stop()) From e5bcbdf8126ddac2a31288d087fd9be93aef0443 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 29 Jul 2022 17:32:17 +0800 Subject: [PATCH 32/35] Scheduler: Rename var for better understanding --- artiq/test/test_scheduler.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index dd1d95cde2..34f9b9582e 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -81,21 +81,20 @@ def __init__(self, test): def record(self, mod): process_mod(self.experiments, mod) - for key, value in self.experiments.items(): - if key not in self.last_status.keys(): - self.last_status[key] = "" - current_status = self.experiments[key]["status"] - if current_status != self.last_status[key]: - self.test.assertIn(current_status, self.flow_map[self.last_status[key]]) - - if key in self.flags["arrive"].keys(): - if current_status in self.flags["arrive"][key].keys(): - self.flags["arrive"][key][current_status].set() - if key in self.flags["leave"].keys(): - if self.last_status[key] in self.flags["leave"][key].keys(): - self.flags["leave"][key][self.last_status[key]].set() - - self.last_status[key] = current_status + for rid, exp_info in self.experiments.items(): + if rid not in self.last_status.keys(): + self.last_status[rid] = "" + if exp_info["status"] != self.last_status[rid]: + self.test.assertIn(exp_info["status"], self.flow_map[self.last_status[rid]]) + + if rid in self.flags["arrive"].keys(): + if exp_info["status"] in self.flags["arrive"][rid].keys(): + self.flags["arrive"][rid][exp_info["status"]].set() + if rid in self.flags["leave"].keys(): + if self.last_status[rid] in self.flags["leave"][rid].keys(): + self.flags["leave"][rid][self.last_status[rid]].set() + + self.last_status[rid] = exp_info["status"] return async def wait_until(self, rid, condition, status): From 36a36c38da203d52b60ea92200c7e24e2cfdb221 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Mon, 1 Aug 2022 13:37:01 +0800 Subject: [PATCH 33/35] Scheduler_monitor: Put flow_map into class attribute --- artiq/test/test_scheduler.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 34f9b9582e..7531e28705 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -61,12 +61,7 @@ def get(self): return rid class SchedulerMonitor: - def __init__(self, test): - self.test = test - self.experiments = {} - self.last_status = {} - self.flags = {"arrive": {}, "leave": {}} - self.flow_map = { # current status -> possible move + flow_map = { # current status -> possible move "": {"pending"}, "pending": {"preparing", "flushing", "deleting"}, "preparing": {"prepare_done", "deleting"}, @@ -79,6 +74,12 @@ def __init__(self, test): "flushing": {"preparing"} } + def __init__(self, test): + self.test = test + self.experiments = {} + self.last_status = {} + self.flags = {"arrive": {}, "leave": {}} + def record(self, mod): process_mod(self.experiments, mod) for rid, exp_info in self.experiments.items(): From a6df1edd72fd95bcc386b412e9e63731a259e281 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 5 Aug 2022 15:09:10 +0800 Subject: [PATCH 34/35] test_scheduler: Put duplicated code to setUp() and tearDown() --- artiq/test/test_scheduler.py | 138 ++++++++++++++--------------------- 1 file changed, 54 insertions(+), 84 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 7531e28705..ec97613d3f 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -60,6 +60,7 @@ def get(self): self._next_rid += 1 return rid + class SchedulerMonitor: flow_map = { # current status -> possible move "": {"pending"}, @@ -86,7 +87,8 @@ def record(self, mod): if rid not in self.last_status.keys(): self.last_status[rid] = "" if exp_info["status"] != self.last_status[rid]: - self.test.assertIn(exp_info["status"], self.flow_map[self.last_status[rid]]) + self.test.assertIn(exp_info["status"], + self.flow_map[self.last_status[rid]]) if rid in self.flags["arrive"].keys(): if exp_info["status"] in self.flags["arrive"][rid].keys(): @@ -107,6 +109,7 @@ async def wait_until(self, rid, condition, status): self.add_flag(rid, condition, status) await self.flags[condition][rid][status].wait() self.remove_flag(rid, condition, status) + return rid def add_flag(self, rid, condition, status): if rid not in self.flags[condition]: @@ -125,34 +128,30 @@ class SchedulerCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) + self.handlers = {} + self.scheduler = Scheduler(_RIDCounter(0), self.handlers, None, None) + self.monitor = SchedulerMonitor(self) + self.scheduler.notifier.publish = self.monitor.record + self.scheduler.start() def test_steps(self): - loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") - monitor = SchedulerMonitor(self) - scheduler.notifier.publish = monitor.record - scheduler.start() # Verify that a timed experiment far in the future does not # get run, even if it has high priority. late = time() + 100000 - scheduler.submit("main", expid, 99, late, False) + self.scheduler.submit("main", expid, 99, late, False) # This one (RID 1) gets run instead. - scheduler.submit("main", expid, 0, None, False) + self.scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) - self.assertEqual(monitor.last_status[0], "pending") - loop.run_until_complete(scheduler.stop()) + self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "deleting")) + self.assertEqual(self.monitor.last_status[0], "pending") def test_pending_priority(self): """Check due dates take precedence over priorities when waiting to prepare.""" - loop = self.loop - handlers = {} - scheduler = Scheduler(_RIDCounter(0), handlers, None, None) - handlers["scheduler_check_pause"] = scheduler.check_pause + self.handlers["scheduler_check_pause"] = self.scheduler.check_pause expid_empty = _get_expid("EmptyExperiment") @@ -166,32 +165,26 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - monitor = SchedulerMonitor(self) - scheduler.notifier.publish = monitor.record - scheduler.start() - - scheduler.submit("main", expid_bg, low_priority) - scheduler.submit("main", expid_empty, high_priority, late) - scheduler.submit("main", expid_empty, middle_priority, early) + self.scheduler.submit("main", expid_bg, low_priority) + self.scheduler.submit("main", expid_empty, high_priority, late) + self.scheduler.submit("main", expid_empty, middle_priority, early) - wait_RID1_leave = loop.create_task( - monitor.wait_until(1, "leave", "pending")) - wait_RID2_leave = loop.create_task( - monitor.wait_until(2, "leave", "pending")) - done, pending = loop.run_until_complete(asyncio.wait( + wait_RID1_leave = self.loop.create_task( + self.monitor.wait_until(1, "leave", "pending")) + wait_RID2_leave = self.loop.create_task( + self.monitor.wait_until(2, "leave", "pending")) + done, pending = self.loop.run_until_complete(asyncio.wait( [wait_RID1_leave, wait_RID2_leave], return_when=asyncio.FIRST_COMPLETED)) self.assertIn(wait_RID2_leave, done) for task in pending: task.cancel() - loop.run_until_complete(monitor.wait_until(2, "arrive", "deleting")) - loop.run_until_complete(scheduler.stop()) + self.loop.run_until_complete(self.monitor.wait_until(2, "arrive", "deleting")) def test_pause(self): - loop = self.loop - termination_ok = False + def check_termination(mod): nonlocal termination_ok self.assertEqual( @@ -199,90 +192,67 @@ def check_termination(mod): {"action": "setitem", "key": "termination_ok", "value": (False, True), "path": []}) termination_ok = True - handlers = { - "update_dataset": check_termination - } - scheduler = Scheduler(_RIDCounter(0), handlers, None, None) + self.handlers["update_dataset"] = check_termination expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") - monitor = SchedulerMonitor(self) - scheduler.notifier.publish = monitor.record - scheduler.start() # check_pause is True when rid with higher priority is prepare_done - scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(monitor.wait_until(0, "arrive", "running")) - self.assertFalse(scheduler.check_pause(0)) - scheduler.submit("main", expid, 0, None, False) - self.assertFalse(scheduler.check_pause(0)) - loop.run_until_complete(monitor.wait_until(1, "arrive", "prepare_done")) - self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) - self.assertFalse(scheduler.check_pause(0)) + self.scheduler.submit("main", expid_bg, -99, None, False) + self.loop.run_until_complete(self.monitor.wait_until(0, "arrive", "running")) + self.assertFalse(self.scheduler.check_pause(0)) + self.scheduler.submit("main", expid, 0, None, False) + self.assertFalse(self.scheduler.check_pause(0)) + self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "prepare_done")) + self.assertTrue(self.scheduler.check_pause(0)) + self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "deleting")) + self.assertFalse(self.scheduler.check_pause(0)) # check_pause is True when request_termination is called self.assertFalse(termination_ok) - self.assertFalse(scheduler.check_pause(0)) - scheduler.request_termination(0) - self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(monitor.wait_until(0, "arrive", "deleting")) + self.assertFalse(self.scheduler.check_pause(0)) + self.scheduler.request_termination(0) + self.assertTrue(self.scheduler.check_pause(0)) + self.loop.run_until_complete(self.monitor.wait_until(0, "arrive", "deleting")) self.assertTrue(termination_ok) - loop.run_until_complete(scheduler.stop()) - def test_close_with_active_runs(self): """Check scheduler exits with experiments still running""" - loop = self.loop - - scheduler = Scheduler(_RIDCounter(0), {}, None, None) - expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. expid_bg["log_level"] = logging.CRITICAL - monitor = SchedulerMonitor(self) expid = _get_expid("EmptyExperiment") - scheduler.notifier.publish = monitor.record - scheduler.start() - scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(monitor.wait_until(0, "arrive", "running")) + self.scheduler.submit("main", expid_bg, -99, None, False) + self.loop.run_until_complete(self.monitor.wait_until(0, "arrive", "running")) - scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(monitor.wait_until(1, "arrive", "prepare_done")) + self.scheduler.submit("main", expid, 0, None, False) + self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "prepare_done")) # At this point, (at least) BackgroundExperiment is still running; make # sure we can stop the scheduler without hanging. - loop.run_until_complete(scheduler.stop()) def test_flush(self): - loop = self.loop - handlers = {} - scheduler = Scheduler(_RIDCounter(0), handlers, None, None) - handlers["scheduler_check_pause"] = scheduler.check_pause + self.handlers["scheduler_check_pause"] = self.scheduler.check_pause expid = _get_expid("EmptyExperiment") expid_bg = _get_expid("CheckPauseBackgroundExperiment") expid_bg["log_level"] = logging.CRITICAL - monitor = SchedulerMonitor(self) - scheduler.notifier.publish = monitor.record - scheduler.start() # Flush with same priority - scheduler.submit("main", expid, 0, None, False) - scheduler.submit("main", expid, 0, None, True) - loop.run_until_complete(monitor.wait_until(1, "arrive", "preparing")) - self.assertEqual(monitor.last_status[0], "deleting") - loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) + self.scheduler.submit("main", expid, 0, None, False) + self.scheduler.submit("main", expid, 0, None, True) + self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "preparing")) + self.assertEqual(self.monitor.last_status[0], "deleting") + self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "deleting")) # Flush with higher priority - scheduler.submit("main", expid_bg, 0, None, False) + self.scheduler.submit("main", expid_bg, 0, None, False) # Make sure RID 2 go into preparing stage first - loop.run_until_complete(monitor.wait_until(2, "arrive", "preparing")) - scheduler.submit("main", expid, 1, None, True) - loop.run_until_complete(monitor.wait_until(3, "arrive", "deleting")) - self.assertEqual(monitor.last_status[2], "running") - - loop.run_until_complete(scheduler.stop()) + self.loop.run_until_complete(self.monitor.wait_until(2, "arrive", "preparing")) + self.scheduler.submit("main", expid, 1, None, True) + self.loop.run_until_complete(self.monitor.wait_until(3, "arrive", "deleting")) + self.assertEqual(self.monitor.last_status[2], "running") def tearDown(self): + self.loop.run_until_complete(self.scheduler.stop()) self.loop.close() From 81a6559928591d1e0f3c5d337457a333751713e9 Mon Sep 17 00:00:00 2001 From: Deepskyhunter Date: Fri, 5 Aug 2022 15:10:50 +0800 Subject: [PATCH 35/35] test_scheduler: Create and apply AssertScheduler --- artiq/test/test_scheduler.py | 79 +++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index ec97613d3f..15c1ab00fa 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -124,7 +124,40 @@ def remove_flag(self, rid, condition, status): del self.flags[condition][rid] -class SchedulerCase(unittest.TestCase): +class AssertScheduler: + def assertStatusEqual(self, rid, status): + rid_status = self.monitor.last_status[rid] + if rid_status != status: + raise AssertionError(f"Status of rid {rid} should be " + f"{status}, instead of {rid_status}") + + def assertArriveStatus(self, rid, status, time_out=10): + try: + self.loop.run_until_complete(asyncio.wait_for( + self.monitor.wait_until(rid, "arrive", status), + time_out)) + except asyncio.TimeoutError: + raise AssertionError(f"rid {rid} did not arrive " + f"{status} within {time_out}s") + + def assertStopped(self, task, time_out=10): + try: + self.loop.run_until_complete(asyncio.wait_for(task, time_out)) + except asyncio.TimeoutError: + raise AssertionError(f"{task} did not complete within {time_out}s") + + def assertFirstLeave(self, first_rid, rids, status): + done, pending = self.loop.run_until_complete(asyncio.wait( + [self.monitor.wait_until(rid, "leave", status) for rid in rids], + return_when=asyncio.FIRST_COMPLETED)) + for task in pending: + task.cancel() + if done.pop().result() != first_rid: + raise AssertionError(f"rid {first_rid} did not leave" + f" {status} first") + + +class SchedulerCase(unittest.TestCase, AssertScheduler): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) @@ -145,8 +178,8 @@ def test_steps(self): # This one (RID 1) gets run instead. self.scheduler.submit("main", expid, 0, None, False) - self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "deleting")) - self.assertEqual(self.monitor.last_status[0], "pending") + self.assertArriveStatus(1, "deleting") + self.assertStatusEqual(0, "pending") def test_pending_priority(self): """Check due dates take precedence over priorities when waiting to @@ -169,18 +202,8 @@ def test_pending_priority(self): self.scheduler.submit("main", expid_empty, high_priority, late) self.scheduler.submit("main", expid_empty, middle_priority, early) - wait_RID1_leave = self.loop.create_task( - self.monitor.wait_until(1, "leave", "pending")) - wait_RID2_leave = self.loop.create_task( - self.monitor.wait_until(2, "leave", "pending")) - done, pending = self.loop.run_until_complete(asyncio.wait( - [wait_RID1_leave, wait_RID2_leave], - return_when=asyncio.FIRST_COMPLETED)) - self.assertIn(wait_RID2_leave, done) - for task in pending: - task.cancel() - - self.loop.run_until_complete(self.monitor.wait_until(2, "arrive", "deleting")) + self.assertFirstLeave(2, [1, 2], "pending") + self.assertArriveStatus(2, "deleting") def test_pause(self): termination_ok = False @@ -199,13 +222,13 @@ def check_termination(mod): # check_pause is True when rid with higher priority is prepare_done self.scheduler.submit("main", expid_bg, -99, None, False) - self.loop.run_until_complete(self.monitor.wait_until(0, "arrive", "running")) + self.assertArriveStatus(0, "running") self.assertFalse(self.scheduler.check_pause(0)) self.scheduler.submit("main", expid, 0, None, False) self.assertFalse(self.scheduler.check_pause(0)) - self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "prepare_done")) + self.assertArriveStatus(1, "prepare_done") self.assertTrue(self.scheduler.check_pause(0)) - self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "deleting")) + self.assertArriveStatus(1, "deleting") self.assertFalse(self.scheduler.check_pause(0)) # check_pause is True when request_termination is called @@ -213,7 +236,7 @@ def check_termination(mod): self.assertFalse(self.scheduler.check_pause(0)) self.scheduler.request_termination(0) self.assertTrue(self.scheduler.check_pause(0)) - self.loop.run_until_complete(self.monitor.wait_until(0, "arrive", "deleting")) + self.assertArriveStatus(0, "deleting") self.assertTrue(termination_ok) def test_close_with_active_runs(self): @@ -224,10 +247,10 @@ def test_close_with_active_runs(self): expid = _get_expid("EmptyExperiment") self.scheduler.submit("main", expid_bg, -99, None, False) - self.loop.run_until_complete(self.monitor.wait_until(0, "arrive", "running")) + self.assertArriveStatus(0, "running") self.scheduler.submit("main", expid, 0, None, False) - self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "prepare_done")) + self.assertArriveStatus(1, "prepare_done") # At this point, (at least) BackgroundExperiment is still running; make # sure we can stop the scheduler without hanging. @@ -241,18 +264,18 @@ def test_flush(self): # Flush with same priority self.scheduler.submit("main", expid, 0, None, False) self.scheduler.submit("main", expid, 0, None, True) - self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "preparing")) - self.assertEqual(self.monitor.last_status[0], "deleting") - self.loop.run_until_complete(self.monitor.wait_until(1, "arrive", "deleting")) + self.assertArriveStatus(1, "preparing") + self.assertStatusEqual(0, "deleting") + self.assertArriveStatus(1, "deleting") # Flush with higher priority self.scheduler.submit("main", expid_bg, 0, None, False) # Make sure RID 2 go into preparing stage first - self.loop.run_until_complete(self.monitor.wait_until(2, "arrive", "preparing")) + self.assertArriveStatus(2, "preparing") self.scheduler.submit("main", expid, 1, None, True) - self.loop.run_until_complete(self.monitor.wait_until(3, "arrive", "deleting")) - self.assertEqual(self.monitor.last_status[2], "running") + self.assertArriveStatus(3, "deleting") + self.assertStatusEqual(2, "running") def tearDown(self): - self.loop.run_until_complete(self.scheduler.stop()) + self.assertStopped(self.scheduler.stop()) self.loop.close()