Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testharness resources #29584

Draft
wants to merge 3 commits into
base: next
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conda/moose-dev/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ moose_wasp:
- moose-wasp 2024.11.13

moose_tools:
- moose-tools 2024.11.13
- moose-tools 2024.12.20

moose_peacock:
- moose-peacock 2024.11.13
Expand Down
2 changes: 1 addition & 1 deletion conda/moose-dev/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# REMEMBER TO UPDATE the .yaml files for the following packages:
# moose/conda_build_config.yaml
# As well as any directions pertaining to modifying those files.
{% set version = "2024.12.02" %}
{% set version = "2024.12.20" %}

package:
name: moose-dev
Expand Down
2 changes: 1 addition & 1 deletion conda/moose/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mpi:
- openmpi

moose_dev:
- moose-dev 2024.12.02
- moose-dev 2024.12.20

#### Darwin SDK SYSROOT
CONDA_BUILD_SYSROOT: # [osx]
Expand Down
3 changes: 2 additions & 1 deletion conda/tools/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# moose-dev/*
#
# As well as any directions pertaining to modifying those files.
{% set version = "2024.11.13" %}
{% set version = "2024.12.20" %}

package:
name: moose-tools
Expand Down Expand Up @@ -45,6 +45,7 @@ requirements:
- packaging
- pandas
- paramiko
- psutil
- psycopg2
- pyarrow
- pybtex
Expand Down
89 changes: 53 additions & 36 deletions python/TestHarness/OutputInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
#* Licensed under LGPL 2.1, please see LICENSE for details
#* https://www.gnu.org/licenses/lgpl-2.1.html

import contextlib
import os
import json
import threading

class OutputInterface:
""" Helper class for writing output to either memory or a file """
def __init__(self):
def __init__(self, locking=False):
# The in-memory output, if any
self.output = ''
# The path to write output to, if any
self.separate_output_path = None
# Thread lock for the output (if enabled)
self.output_lock = threading.Lock() if locking else None

class BadOutputException(Exception):
""" Exception that is thrown when bad output is detected """
Expand All @@ -25,24 +29,35 @@ def __init__(self, errors):
message = 'Bad output detected: ' + ', '.join(errors)
super().__init__(message)

def getOutputLock(self):
"""
Gets the thread lock for this system, if any.

This is safe to use in a with statement even if locking
is not enabled.
"""
return self.output_lock if self.output_lock else contextlib.suppress()

def setSeparateOutputPath(self, separate_output_path):
""" Sets the path for writing output to """
self.separate_output_path = separate_output_path

# If we have any dangling output, write it
if self.output:
self.setOutput(self.output)
self.output = ''
with self.getOutputLock():
if self.output:
self.setOutput(self.output)
self.output = ''

def getSeparateOutputFilePath(self) -> str:
""" Gets the path that this output is writing to, if any """
return self.separate_output_path

def hasOutput(self) -> bool:
""" Whether or not this object has any content written """
if self.separate_output_path:
return os.path.isfile(self.separate_output_path)
return len(self.output) > 0
with self.getOutputLock():
if self.separate_output_path:
return os.path.isfile(self.separate_output_path)
return len(self.output) > 0

def getOutput(self, sanitize: bool = True) -> str:
"""
Expand All @@ -56,46 +71,48 @@ def getOutput(self, sanitize: bool = True) -> str:
on before the output is used.
"""
output = ''
if self.separate_output_path:
try:
output = open(self.separate_output_path, 'r').read()
except FileNotFoundError:
pass
else:
output = self.output

if sanitize:
_, sanitize_failures = self._sanitizeOutput(output)
if sanitize_failures:
raise self.BadOutputException(sanitize_failures)

return output
with self.getOutputLock():
if self.separate_output_path:
try:
output = open(self.separate_output_path, 'r').read()
except FileNotFoundError:
pass
else:
output = self.output

if sanitize:
_, sanitize_failures = self._sanitizeOutput(output)
if sanitize_failures:
raise self.BadOutputException(sanitize_failures)

return output

def setOutput(self, output: str):
""" Sets the output given some output string """
if not output:
return
if self.separate_output_path:
open(self.separate_output_path, 'w').write(output)
else:
self.output = output
with self.getOutputLock():
if self.separate_output_path:
open(self.separate_output_path, 'w').write(output)
else:
self.output = output

def appendOutput(self, output: str):
""" Appends to the output """
if not output:
return
if self.separate_output_path:
open(self.separate_output_path, 'a').write(output)
else:
self.output += output
with self.getOutputLock():
if self.separate_output_path:
open(self.separate_output_path, 'a').write(output)
else:
self.output += output

def clearOutput(self):
""" Clears the output """
if self.separate_output_path:
if os.path.exists(self.separate_output_path):
os.remove(self.separate_output_path)
else:
self.output = ''
with self.getOutputLock():
if self.separate_output_path:
if os.path.exists(self.separate_output_path):
os.remove(self.separate_output_path)
else:
self.output = ''

@staticmethod
def _sanitizeOutput(output):
Expand Down
53 changes: 52 additions & 1 deletion python/TestHarness/TestHarness.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ def cleanup(self):
summary += fatal_error
print(util.colorText(summary, "", html=True, colored=self.options.colored, code=self.options.code))
else:
# Fill summary footer
summary = ''

# Number of tests, their status, and timing
num_nonzero_timing = sum(1 if float(tup[0].getTiming()) > 0 else 0 for tup in self.test_table)
if num_nonzero_timing > 0:
timing_max = max(float(tup[0].getTiming()) for tup in self.test_table)
Expand All @@ -739,6 +743,13 @@ def cleanup(self):
summary = f'Ran {self.num_passed + self.num_failed} tests in {stats["time_total"]:.1f} seconds.'
summary += f' Average test time {timing_avg:.1f} seconds,'
summary += f' maximum test time {timing_max:.1f} seconds.'
# Memory usage, if available
max_memory = [tup[0].getMaxMemoryUsage() for tup in self.test_table if tup[0].getMaxMemoryUsage() not in [None, 0]]
if max_memory:
max_max_memory = max(max_memory)
avg_max_memory = sum(max_memory) / len(max_memory)
summary += f'\nEstimated maximum test memory usage maximum {util.humanMemory(max_max_memory)}, '
summary += f'average {util.humanMemory(avg_max_memory)}.'
print(summary)

# Get additional results from the scheduler
Expand Down Expand Up @@ -809,6 +820,25 @@ def cleanup(self):
print(str(group[0]).ljust((self.options.term_cols - (len(group[1]) + 4)), ' '), f'[{group[1]}s]')
print('\n')

if self.options.largest_jobs:
valued_tups = [tup for tup in self.test_table if tup[0].getMaxMemoryUsage() not in [None, 0]]
sorted_tups = sorted(valued_tups, key=lambda tup: tup[0].getMaxMemoryUsage(), reverse=True)

print('\n%d largest jobs:' % self.options.largest_jobs)
print(('-' * (self.options.term_cols)))

# Copy the current options and force timing to be true so that
# we get memory when we call formatResult() below
options_with_timing = copy.deepcopy(self.options)
options_with_timing.timing = True

for tup in sorted_tups[0:self.options.largest_jobs]:
job = tup[0]
if not job.isSkip() and job.getMaxMemoryUsage() > 0:
print(util.formatResult(job, options_with_timing, caveats=True))
if len(sorted_tups) == 0:
print('No jobs were completed or no jobs contained resource usage.')

all_jobs = self.scheduler.retrieveJobs()

# Gather and print the jobs with race conditions after the jobs are finished
Expand Down Expand Up @@ -1065,6 +1095,7 @@ def parseCLArgs(self, argv):
parser.add_argument('-l', '--load-average', action='store', type=float, dest='load', help='Do not run additional tests if the load average is at least LOAD')
parser.add_argument('-t', '--timing', action='store_true', dest='timing', help='Report Timing information for passing tests')
parser.add_argument('--longest-jobs', action='store', dest='longest_jobs', type=int, default=0, help='Print the longest running jobs upon completion')
parser.add_argument('--largest-jobs', action='store', dest='largest_jobs', type=int, default=20, help='Print the largest (by max memory usage) jobs upon completion')
parser.add_argument('-s', '--scale', action='store_true', dest='scaling', help='Scale problems that have SCALE_REFINE set')
parser.add_argument('-i', nargs=1, action='store', type=str, dest='input_file_name', help='The test specification file to look for (default: tests)')
parser.add_argument('--libmesh_dir', nargs=1, action='store', type=str, dest='libmesh_dir', help='Currently only needed for bitten code coverage')
Expand Down Expand Up @@ -1131,6 +1162,10 @@ def parseCLArgs(self, argv):
hpcgroup.add_argument('--hpc-no-hold', nargs=1, action='store', type=bool, default=False, dest='hpc_no_hold', help='Do not pre-create hpc jobs to be held')
hpcgroup.add_argument('--pbs-queue', nargs=1, action='store', dest='hpc_queue', type=str, metavar='', help='Submit jobs to the specified queue')

# Options for resource limits
resourcegroup = parser.add_argument_group('Resource Options', 'Options for controlling resource limits')
resourcegroup.add_argument('--max-memory', dest='max_memory', action='store', type=str, default=None, help='Set maximum memory allowed per slot (default none, ex: 100MB)')

# Try to find the terminal size if we can
# Try/except here because the terminal size could fail w/o a display
term_cols = None
Expand All @@ -1142,7 +1177,7 @@ def parseCLArgs(self, argv):

# Optionally load in the environment controlled values
term_cols = int(os.getenv('MOOSE_TERM_COLS', term_cols))
term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcst')
term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcsmt')

# Terminal options
termgroup = parser.add_argument_group('Terminal Options', 'Options for controlling the formatting of terminal output')
Expand Down Expand Up @@ -1234,6 +1269,22 @@ def checkAndUpdateCLArgs(self):
if not self.options.input_file_name:
self.options.input_file_name = 'tests'

# Resource usage collection
has_psutil = True
try:
import psutil
except:
has_psutil = False
# Set max_memory in bytes if set
if self.options.max_memory is not None:
try:
self.options.max_memory = util.convertMemoryToBytes(self.options.max_memory)
except:
print(f'ERROR: Failed to parse --max-memory="{self.options.max_memory}"')
sys.exit(1)
if not has_psutil:
print(f'ERROR: --max-memory cannot be used because the python module "psutil" is not available')

def postRun(self, specs, timing):
return

Expand Down
53 changes: 51 additions & 2 deletions python/TestHarness/runners/Runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
#* Licensed under LGPL 2.1, please see LICENSE for details
#* https://www.gnu.org/licenses/lgpl-2.1.html

import os, json
import os, threading, time, traceback
from collections import namedtuple
from TestHarness import OutputInterface, util

class Runner(OutputInterface):
# Helper struct for storing information about sampled resource usage
ResourceUsage = namedtuple('ResourceUsage', 'time mem_bytes')

"""
Base class for running a process via a command.

Expand All @@ -19,7 +23,8 @@ class Runner(OutputInterface):
or externally (i.e., PBS, slurm, etc on HPC)
"""
def __init__(self, job, options):
OutputInterface.__init__(self)
# Output is locking so that the resource thread can concurrently write
OutputInterface.__init__(self, locking=True)

# The job that this runner is for
self.job = job
Expand Down Expand Up @@ -109,3 +114,47 @@ def readOutput(self, stream):
if output and output[-1] != '\n':
output += '\n'
return output

def getResourceUsage(self):
"""
To be overridden by derived Runners that support resource usage collection

Should return a list of ResourceUsage objects
"""
return None

def getMaxMemoryUsage(self):
"""
Get the max memory usage (in bytes) of the spawned process if it was
able to be captured
"""
resource_usage = self.getResourceUsage()
if not resource_usage: # runner doesn't support it
return None
max_mem = 0
for usage in resource_usage:
max_mem = max(max_mem, usage.mem_bytes)
return max_mem

def checkResourceUsage(self, usage):
"""
Checks the resource usage to ensure that it does not go over
limits. Will kill the job if so.

Usage should be a ResourceUsage object
"""
# Scale all of the requirements on a per-slot basis
slots = self.job.getSlots()

# Check for memory overrun if max is set
if self.options.max_memory is not None:
allowed_mem = slots * self.options.max_memory
if usage.mem_bytes > allowed_mem:
usage_human = util.humanMemory(usage.mem_bytes)
allowed_human = util.humanMemory(allowed_mem)
output = util.outputHeader('Process killed due to resource oversubscription')
output += f'Memory usage {usage_human} exceeded {allowed_human}'
self.appendOutput(output)
self.job.setStatus(self.job.error, 'EXCEEDED MEM')
self.kill()
return
Loading