Skip to content

Commit

Permalink
change on locks and flags
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenyu-ms committed Dec 27, 2023
1 parent 288af8e commit 35151a2
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 50 deletions.
1 change: 1 addition & 0 deletions testplan/runnable/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ def _collect_task_info(self, target: TestTask) -> TaskInformation:
"Unrecognized test target of type {}".format(type(target))
)

# TODO: include executor in ancestor chain?
if isinstance(target_test, Runnable):
target_test.parent = self
target_test.cfg.parent = self.cfg
Expand Down
4 changes: 4 additions & 0 deletions testplan/runners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, **options) -> None:
self._results = OrderedDict()
self.ongoing = []

self._discard_pending = False
self.id_in_parent: Optional[str] = None

@property
Expand Down Expand Up @@ -84,6 +85,9 @@ def _loop(self) -> None:
raise NotImplementedError()

def _prepopulate_runnables(self) -> None:
# _discard_pending can be set any time
if self._discard_pending:
return
# If we are to apply test_sorter, it would be here
# but it's not easy to implement a reasonable behavior
# as _input could be a mixture of runnable/task/callable
Expand Down
63 changes: 32 additions & 31 deletions testplan/runners/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,44 @@ def __init__(self, **options) -> None:
super(LocalRunner, self).__init__(**options)
self._uid = "local_runner"

self._to_skip_remaining = False
self._ongoing_lock = threading.Lock()
self._curr_runnable_lock = threading.Lock()
self._curr_runnable = None

def execute(self, uid: str) -> TestResult:
"""Execute item implementation."""
# First retrieve the input from its UID.
if self._to_skip_remaining:
target = self._input[uid]

if self._discard_pending:
# to skip materialize, should be disposed immediately
return TestResult()

target = self._input[uid]

# Inspect the input type. Tasks must be materialized before
# they can be run.
if isinstance(target, Test):
runnable = target
elif isinstance(target, tasks.Task):
runnable = target.materialize()
elif callable(target):
runnable = target()
else:
raise TypeError(f"Cannot execute target of type {type(target)}")

# guard
if not isinstance(runnable, Test):
raise TypeError(f"Cannot execute target of type {type(runnable)}")
# pass the ball
if not runnable.parent:
runnable.parent = self
if not runnable.cfg.parent:
runnable.cfg.parent = self.cfg

with self._ongoing_lock:
with self._curr_runnable_lock:
# Inspect the input type. Tasks must be materialized before
# they can be run.
if isinstance(target, Test):
runnable = target
elif isinstance(target, tasks.Task):
runnable = target.materialize()
elif callable(target):
runnable = target()
else:
raise TypeError(
f"Cannot execute target of type {type(target)}"
)

if not isinstance(runnable, Test):
raise TypeError(
f"Cannot execute target of type {type(runnable)}"
)
if not runnable.parent:
runnable.parent = self
runnable.cfg.parent = self.cfg
self._curr_runnable = runnable

result = self._curr_runnable.run()
with self._ongoing_lock:

with self._curr_runnable_lock:
self._curr_runnable = None

return result
Expand Down Expand Up @@ -90,8 +91,8 @@ def _loop(self) -> None:
exc,
)
finally:
with self._ongoing_lock:
if not self._to_skip_remaining:
with self._curr_runnable_lock:
if not self._discard_pending:
# otherwise result from aborted test included
self._results[next_uid] = result
self.ongoing.pop(0)
Expand Down Expand Up @@ -126,8 +127,8 @@ def aborting(self) -> None:
self.discard_pending_tasks(reluctantly=True)

def discard_pending_tasks(self, reluctantly: bool):
self._to_skip_remaining = True
with self._ongoing_lock:
with self._curr_runnable_lock:
self._discard_pending = True
if self._curr_runnable:
self._curr_runnable.abort()
if reluctantly:
Expand Down
36 changes: 17 additions & 19 deletions testplan/runners/pools/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ class Worker(WorkerBase):
def __init__(self, **options) -> None:
super().__init__(**options)
self._handler = None
self._curr = None
self._curr_mutex = threading.Lock()
self._curr_runnable_lock = threading.Lock()
self._curr_runnable = None

@property
def handler(self):
Expand Down Expand Up @@ -227,13 +227,12 @@ def _loop(self, transport: QueueClient) -> None:
elif received.cmd == Message.TaskSending:
results = []
for item in received.data:
# XXX
if self._discard_running.is_set():
break

task_result = self.execute(item)
if not self._discard_running.is_set():
results.append(task_result)

if self.cfg.skip_strategy.should_skip_rest_tests(
task_result.result.report.status
):
Expand All @@ -256,19 +255,18 @@ def execute(self, task: Task) -> TaskResult:
:return: Task result.
"""
try:
runnable: Test = task.materialize()

if isinstance(runnable, entity.Runnable):
if not runnable.parent:
runnable.parent = self
if not runnable.cfg.parent:
runnable.cfg.parent = self.cfg

with self._curr_mutex:
self._curr = runnable
with self._curr_runnable_lock:
runnable: Test = task.materialize()

if isinstance(runnable, entity.Runnable):
if not runnable.parent:
runnable.parent = self
if not runnable.cfg.parent:
runnable.cfg.parent = self.cfg
self._curr_runnable = runnable
result: TestResult = runnable.run()
with self._curr_mutex:
self._curr = None
with self._curr_runnable_lock:
self._curr_runnable = None

except BaseException:
task_result = TaskResult(
Expand All @@ -284,9 +282,9 @@ def execute(self, task: Task) -> TaskResult:

def discard_running_tasks(self):
self._discard_running.set()
with self._curr_mutex:
if self._curr is not None:
self._curr.abort()
with self._curr_runnable_lock:
if self._curr_runnable is not None:
self._curr_runnable.abort()


class PoolConfig(ExecutorConfig):
Expand Down

0 comments on commit 35151a2

Please sign in to comment.