Skip to content

Commit

Permalink
Merge pull request #401 from simonsobs/koopman/agent-fixture-timeout-…
Browse files Browse the repository at this point in the history
…class

Interrupt agent within pytest fixture if it crashes during testing
  • Loading branch information
BrianJKoopman authored Aug 29, 2024
2 parents 42fb194 + 5dd79e2 commit fe03421
Showing 1 changed file with 99 additions and 28 deletions.
127 changes: 99 additions & 28 deletions ocs/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import coverage.data
import urllib.request

from threading import Timer
from urllib.error import URLError

from ocs.ocs_client import OCSClient
Expand All @@ -14,8 +15,8 @@
SIGINT_TIMEOUT = 10


def create_agent_runner_fixture(agent_path, agent_name, args=None):
"""Create a pytest fixture for running a given OCS Agent.
class _AgentRunner:
"""Class to manage running an agent as a subprocess during testing.
Parameters:
agent_path (str): Relative path to Agent,
Expand All @@ -24,45 +25,115 @@ def create_agent_runner_fixture(agent_path, agent_name, args=None):
args (list): Additional CLI arguments to add when starting the Agent
"""
@pytest.fixture()
def run_agent(cov):
env = os.environ.copy()
env['COVERAGE_FILE'] = f'.coverage.agent.{agent_name}'
env['OCS_CONFIG_DIR'] = os.getcwd()
cmd = ['coverage', 'run',
'--rcfile=./.coveragerc',
agent_path,
'--site-file',
'./default.yaml']

def __init__(self, agent_path, agent_name, args):
self.env = os.environ.copy()
self.env['COVERAGE_FILE'] = f'.coverage.agent.{agent_name}'
self.env['OCS_CONFIG_DIR'] = os.getcwd()
self.cmd = [
'python',
'-u',
'-m',
'coverage',
'run',
'--rcfile=./.coveragerc',
agent_path,
'--site-file',
'./default.yaml'
]
if args is not None:
cmd.extend(args)
agentproc = subprocess.Popen(cmd,
env=env,
self.cmd.extend(args)
self.agent_name = agent_name
self.proc = None
self._timer = None
self._timedout = False

def run(self, timeout):
"""Run the agent subprocess.
This runs the agent subprocess defined by ``self.cmd``. Output is
written to a ``PIPE``. If the agent does not exit within the given
timeout it will be interrupted with a ``SIGKILL``.
Parameters:
timeout (float): Timeout in seconds to wait for agent to exit.
"""
self.proc = subprocess.Popen(self.cmd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
preexec_fn=os.setsid)

def raise_subprocess(msg):
stdout, stderr = agentproc.stdout.read(), agentproc.stderr.read()
print(f'Here is stdout from {agent_name}:\n{stdout}')
print(f'Here is stderr from {agent_name}:\n{stderr}')
raise RuntimeError(msg)
# start timer for if agent crashes and hangs
self._timer = Timer(timeout, self._interrupt)
self._timer.start()

# Wait briefly then make sure subprocess hasn't already exited.
time.sleep(1)
if agentproc.poll() is not None:
raise_subprocess(f"Agent failed to startup, cmd: {cmd}")
if self.proc.poll() is not None:
self._timer.cancel()
self._raise_subprocess(f"Agent failed to startup, cmd: {self.cmd}")

yield
def _interrupt(self):
# not graceful, but handles really misbehaved agent subprocesses
self.proc.send_signal(signal.SIGKILL)
self._timedout = True

def _raise_subprocess(self, msg):
stdout, stderr = self.proc.stdout.read(), self.proc.stderr.read()
print(f'Here is stdout from {self.agent_name}:\n{stdout}')
print(f'Here is stderr from {self.agent_name}:\n{stderr}')
raise RuntimeError(msg)

def shutdown(self):
"""Shutdown the agent process.
If the agent does not respond to a ``SIGINT`` then output is printed,
and an exception raised.
# shutdown Agent
agentproc.send_signal(signal.SIGINT)
"""
# don't send SIGINT if we've already sent SIGKILL
if not self._timedout:
self.proc.send_signal(signal.SIGINT)
self._timer.cancel()

try:
agentproc.communicate(timeout=SIGINT_TIMEOUT)
self.proc.communicate(timeout=SIGINT_TIMEOUT)
except subprocess.TimeoutExpired:
raise_subprocess('Agent did not terminate within '
f'{SIGINT_TIMEOUT} seconds on SIGINT.')
self._raise_subprocess('Agent did not terminate within '
f'{SIGINT_TIMEOUT} seconds on SIGINT.')

if self._timedout:
stdout, stderr = self.proc.communicate(timeout=SIGINT_TIMEOUT)
print(f'Here is stdout from {self.agent_name}:\n{stdout}')
print(f'Here is stderr from {self.agent_name}:\n{stderr}')
raise RuntimeError('Agent timed out.')


def create_agent_runner_fixture(agent_path, agent_name, args=None, timeout=60):
"""Create a pytest fixture for running a given OCS Agent.
Parameters:
agent_path (str): Relative path to Agent,
i.e. '../agents/fake_data/fake_data_agent.py'
agent_name (str): Short, unique name for the agent
args (list): Additional CLI arguments to add when starting the Agent
timeout (float): Timeout in seconds, after which the agent process will
be interrupted. This typically indicates a crash within the agent.
This timeout should be longer than you expect the agent to run for
during a given test. Defaults to 60 seconds.
"""
@pytest.fixture()
def run_agent(cov):
runner = _AgentRunner(agent_path, agent_name, args)
runner.run(timeout=timeout)

yield

runner.shutdown()

# report coverage
agentcov = coverage.data.CoverageData(
Expand Down

0 comments on commit fe03421

Please sign in to comment.