Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make testplan timeout obvious for users in report #1184

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions testplan/common/entity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from schema import Or

from testplan.common.config import Config, ConfigOption
from testplan.common.report import Status
from testplan.common.utils import logger
from testplan.common.utils.path import default_runpath, makedirs, makeemptydirs
from testplan.common.utils.strings import slugify, uuid4
Expand Down Expand Up @@ -690,6 +691,7 @@ def abort(self):
self.logger.info("Aborting %s", self)
self.aborting()
self._aborted = True
self.report.status_override = Status.INCOMPLETE
self.logger.info("Aborted %s", self)

def abort_dependencies(self):
Expand Down Expand Up @@ -1041,7 +1043,7 @@ def _run_batch_steps(self):
"""
Runs the runnable object by executing a batch of steps.
"""
start_threads, start_procs = self._get_start_info()
start_threads, start_procs = self._get_process_info()

self._add_step(self.setup)
self.add_pre_resource_steps()
Expand All @@ -1060,16 +1062,16 @@ def _run_batch_steps(self):
self._post_run_checks(start_threads, start_procs)

@staticmethod
def _get_start_info():
def _get_process_info(recursive=False):
"""
:return: lists of threads and child processes, to be passed to the
_post_run_checks method after the run has finished.
"""
start_threads = threading.enumerate()
threads = threading.enumerate()
current_proc = psutil.Process()
start_children = current_proc.children()
children = current_proc.children(recursive)

return start_threads, start_children
return threads, children

def _post_run_checks(self, start_threads, start_procs):
"""
Expand Down
65 changes: 61 additions & 4 deletions testplan/runnable/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import os
import random
import re
import sys
import time
import uuid
import webbrowser
from collections import OrderedDict
from copy import copy
from dataclasses import dataclass
from itertools import zip_longest
from traceback import format_stack
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -71,6 +73,7 @@
is_task_target,
)
from testplan.testing import common, filtering, listing, ordering, tagging
from testplan.testing.result import Result
from testplan.testing.base import Test, TestResult
from testplan.testing.listing import Lister
from testplan.testing.multitest import MultiTest
Expand Down Expand Up @@ -1141,6 +1144,24 @@ def add_post_resource_steps(self):
super(TestRunner, self).add_post_resource_steps()
self._add_step(self._stop_resource_monitor)

def _collect_timeout_info(self):
threads, processes = self._get_process_info(recursive=True)
self._timeout_info = {"threads": [], "processes": []}
for thread in threads:
self._timeout_info["threads"].append(
os.linesep.join(
[thread.name]
+ format_stack(sys._current_frames()[thread.ident])
)
)

for process in processes:
command = " ".join(process.cmdline()) or process
parent_pid = getattr(process, "ppid", lambda: None)()
self._timeout_info["processes"].append(
f"Pid: {process.pid}, Parent pid: {parent_pid}, {command}"
)

def _wait_ongoing(self):
# TODO: if a pool fails to initialize we could reschedule the tasks.
if self.resources.start_exceptions:
Expand All @@ -1154,16 +1175,14 @@ def _wait_ongoing(self):

while self.active:
if self.cfg.timeout and time.time() - _start_ts > self.cfg.timeout:
msg = (
f"Timeout: Aborting execution after {self.cfg.timeout} seconds",
)
self._collect_timeout_info()
msg = f"Timeout: Aborting execution after {self.cfg.timeout} seconds"
self.result.report.logger.error(msg)
self.logger.error(msg)

# Abort resources e.g pools
for dep in self.abort_dependencies():
self._abort_entity(dep)

break

pending_work = False
Expand Down Expand Up @@ -1251,6 +1270,44 @@ def _create_result(self):

step_result = self._merge_reports(test_rep_lookup) and step_result

if hasattr(self, "_timeout_info"):
msg = f"Testplan timed out after {self.cfg.timeout} seconds"
timeout_entry = TestGroupReport(
name="Testplan timeout",
description=msg,
category=ReportCategories.SYNTHESIZED,
# status_override=Status.ERROR,
)
timeout_case = TestCaseReport(
name="Testplan timeout",
description=msg,
status_override=Status.ERROR,
)
log_result = Result()
log_result.log(
message=f"".join(
f"{log['created'].strftime('%Y-%m-%d %H:%M:%S')} {log['levelname']} {log['message']}\n"
for log in self.report.flattened_logs
),
description="Logs from testplan",
)

log_result.log(
message=os.linesep.join(self._timeout_info["threads"]),
description="Stack trace from threads",
)

log_result.log(
message=os.linesep.join(self._timeout_info["processes"])
if len(self._timeout_info["processes"])
else "No child processes",
description="Running child processes",
)

timeout_case.extend(log_result.serialized_entries)
timeout_entry.append(timeout_case)
plan_report.append(timeout_entry)

# Reset UIDs of the test report and all of its children in UUID4 format
if self._reset_report_uid:
plan_report.reset_uid()
Expand Down
14 changes: 11 additions & 3 deletions testplan/runners/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,14 @@ def stopping(self) -> None:
def aborting(self) -> None:
"""Aborting logic."""
self.discard_pending_tasks(
report_status=Status.ERROR, report_reason=f"due to {self} aborted"
report_status=Status.INCOMPLETE,
report_reason=f"due to {self} aborted",
)

def discard_pending_tasks(
self, report_status: Status = Status.NONE, report_reason: str = ""
self,
report_status: Status = Status.INCOMPLETE,
report_reason: str = "",
):
with self._curr_runnable_lock:
self._discard_pending = True
Expand All @@ -144,19 +147,24 @@ def discard_pending_tasks(
self.logger.warning("Discard pending tasks of %s.", self)
while self.ongoing:
uid = self.ongoing.pop(0)
if report_status:
try:
result = self.added_items[uid].result
result.report.status_override = report_status
except (KeyError, AttributeError):
result = TestResult()
result.report = TestGroupReport(
name=uid,
category=ReportCategories.ERROR,
)
result.report.status_override = report_status
else:
result.report.logger.warning(
"Test[%s] discarded%s.",
uid,
" " + report_reason if report_reason else "",
)
self._results[uid] = result

if report_reason:
self.logger.warning(
"Discarding Test[%s] %s.", uid, report_reason
Expand Down
Loading