Skip to content

Commit

Permalink
change on locks and flags
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenyu-ms committed Dec 27, 2023
1 parent 288af8e commit f3d6dea
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 59 deletions.
1 change: 1 addition & 0 deletions testplan/runnable/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ def _collect_task_info(self, target: TestTask) -> TaskInformation:
"Unrecognized test target of type {}".format(type(target))
)

# TODO: include executor in ancestor chain?
if isinstance(target_test, Runnable):
target_test.parent = self
target_test.cfg.parent = self.cfg
Expand Down
4 changes: 4 additions & 0 deletions testplan/runners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, **options) -> None:
self._results = OrderedDict()
self.ongoing = []

self._discard_pending = False
self.id_in_parent: Optional[str] = None

@property
Expand Down Expand Up @@ -84,6 +85,9 @@ def _loop(self) -> None:
raise NotImplementedError()

def _prepopulate_runnables(self) -> None:
# _discard_pending can be set any time
if self._discard_pending:
return
# If we are to apply test_sorter, it would be here
# but it's not easy to implement a reasonable behavior
# as _input could be a mixture of runnable/task/callable
Expand Down
63 changes: 32 additions & 31 deletions testplan/runners/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,44 @@ def __init__(self, **options) -> None:
super(LocalRunner, self).__init__(**options)
self._uid = "local_runner"

self._to_skip_remaining = False
self._ongoing_lock = threading.Lock()
self._curr_runnable_lock = threading.Lock()
self._curr_runnable = None

def execute(self, uid: str) -> TestResult:
"""Execute item implementation."""
# First retrieve the input from its UID.
if self._to_skip_remaining:
target = self._input[uid]

if self._discard_pending:
# to skip materialize, should be disposed immediately
return TestResult()

target = self._input[uid]

# Inspect the input type. Tasks must be materialized before
# they can be run.
if isinstance(target, Test):
runnable = target
elif isinstance(target, tasks.Task):
runnable = target.materialize()
elif callable(target):
runnable = target()
else:
raise TypeError(f"Cannot execute target of type {type(target)}")

# guard
if not isinstance(runnable, Test):
raise TypeError(f"Cannot execute target of type {type(runnable)}")
# pass the ball
if not runnable.parent:
runnable.parent = self
if not runnable.cfg.parent:
runnable.cfg.parent = self.cfg

with self._ongoing_lock:
with self._curr_runnable_lock:
# Inspect the input type. Tasks must be materialized before
# they can be run.
if isinstance(target, Test):
runnable = target
elif isinstance(target, tasks.Task):
runnable = target.materialize()
elif callable(target):
runnable = target()
else:
raise TypeError(
f"Cannot execute target of type {type(target)}"
)

if not isinstance(runnable, Test):
raise TypeError(
f"Cannot execute target of type {type(runnable)}"
)
if not runnable.parent:
runnable.parent = self
runnable.cfg.parent = self.cfg
self._curr_runnable = runnable

result = self._curr_runnable.run()
with self._ongoing_lock:

with self._curr_runnable_lock:
self._curr_runnable = None

return result
Expand Down Expand Up @@ -90,8 +91,8 @@ def _loop(self) -> None:
exc,
)
finally:
with self._ongoing_lock:
if not self._to_skip_remaining:
with self._curr_runnable_lock:
if not self._discard_pending:
# otherwise result from aborted test included
self._results[next_uid] = result
self.ongoing.pop(0)
Expand Down Expand Up @@ -126,8 +127,8 @@ def aborting(self) -> None:
self.discard_pending_tasks(reluctantly=True)

def discard_pending_tasks(self, reluctantly: bool):
self._to_skip_remaining = True
with self._ongoing_lock:
with self._curr_runnable_lock:
self._discard_pending = True
if self._curr_runnable:
self._curr_runnable.abort()
if reluctantly:
Expand Down
36 changes: 17 additions & 19 deletions testplan/runners/pools/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ class Worker(WorkerBase):
def __init__(self, **options) -> None:
super().__init__(**options)
self._handler = None
self._curr = None
self._curr_mutex = threading.Lock()
self._curr_runnable_lock = threading.Lock()
self._curr_runnable = None

@property
def handler(self):
Expand Down Expand Up @@ -227,13 +227,12 @@ def _loop(self, transport: QueueClient) -> None:
elif received.cmd == Message.TaskSending:
results = []
for item in received.data:
# XXX
if self._discard_running.is_set():
break

task_result = self.execute(item)
if not self._discard_running.is_set():
results.append(task_result)

if self.cfg.skip_strategy.should_skip_rest_tests(
task_result.result.report.status
):
Expand All @@ -256,19 +255,18 @@ def execute(self, task: Task) -> TaskResult:
:return: Task result.
"""
try:
runnable: Test = task.materialize()

if isinstance(runnable, entity.Runnable):
if not runnable.parent:
runnable.parent = self
if not runnable.cfg.parent:
runnable.cfg.parent = self.cfg

with self._curr_mutex:
self._curr = runnable
with self._curr_runnable_lock:
runnable: Test = task.materialize()

if isinstance(runnable, entity.Runnable):
if not runnable.parent:
runnable.parent = self
if not runnable.cfg.parent:
runnable.cfg.parent = self.cfg
self._curr_runnable = runnable
result: TestResult = runnable.run()
with self._curr_mutex:
self._curr = None
with self._curr_runnable_lock:
self._curr_runnable = None

except BaseException:
task_result = TaskResult(
Expand All @@ -284,9 +282,9 @@ def execute(self, task: Task) -> TaskResult:

def discard_running_tasks(self):
self._discard_running.set()
with self._curr_mutex:
if self._curr is not None:
self._curr.abort()
with self._curr_runnable_lock:
if self._curr_runnable is not None:
self._curr_runnable.abort()


class PoolConfig(ExecutorConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def ppool(size=2):
(ppool, "tests-on-failed", ((1,),)),
),
)
def test_skip_remaining_intra_executor(exec_gen, option, abs_report_struct):
def test_intra_executor(exec_gen, option, abs_report_struct):
mockplan = TestplanMock(
name="in the middle of functional test",
skip_strategy=option,
Expand Down Expand Up @@ -145,9 +145,10 @@ def make_mt2():
(
*permutations(("lrunner", "tpool", "ppool"), 2),
("tpool", "lrunner", "tpool"),
("ppool", "ppool", "ppool"),
),
)
def test_skip_remaining_inter_executor(exec_ids):
def test_inter_executor(exec_ids):
mockplan = TestplanMock(
name="in the middle of functional test",
skip_strategy="tests-on-failed",
Expand All @@ -159,3 +160,4 @@ def test_skip_remaining_inter_executor(exec_ids):
mockplan.schedule(target=mt, resource=rid)
report = mockplan.run().report
assert len(report) == 1
assert report.entries[0].name == "mt5"
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# ``stop_on_error`` deprecated, for testing compatibility

import time

import testplan.testing.multitest as mt

from testplan.common.utils.testing import check_report
from testplan.report import (
ReportCategories,
Status,
TestReport,
TestGroupReport,
TestCaseReport,
ReportCategories,
TestGroupReport,
TestReport,
)


Expand Down
6 changes: 3 additions & 3 deletions tests/unit/testplan/runners/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import testplan.testing.multitest as mt
from testplan.common.utils.selector import Eq
from testplan.runnable import TestRunner as MyTestRunner
from testplan.runnable import TestRunner
from testplan.runners.local import LocalRunner


Expand Down Expand Up @@ -37,7 +37,7 @@ def gen_mt(*suites):
),
)
def test_local_discard_pending(pre_sleep, post_sleep, out_sleep, has_result):
par = MyTestRunner(name="in-the-middle-of-unit-tests")
par = TestRunner(name="in-the-middle-of-unit-tests")
par.add_resource(LocalRunner(), "non-express")
mt = gen_mt(Suite(pre_sleep, post_sleep))
par.add(mt, "non-express")
Expand All @@ -54,5 +54,5 @@ def test_local_discard_pending(pre_sleep, post_sleep, out_sleep, has_result):
assert len(repo) == 1
assert len(repo.entries[0]) == 1
else:
assert r._to_skip_remaining is True
assert r._discard_pending is True
assert len(r.results) == 0

0 comments on commit f3d6dea

Please sign in to comment.