From 9bac08a99b32835a32d4a83c41ac741dbf0c9b12 Mon Sep 17 00:00:00 2001 From: Brett Holman Date: Mon, 6 Jan 2025 22:06:38 -0700 Subject: [PATCH] test: disambiguate resource cleanup from test failure (#5926) Add a background thread which is responsible for instance cleanup in integration tests. --- tests/integration_tests/conftest.py | 18 +++ tests/integration_tests/instances.py | 4 +- tests/integration_tests/reaper.py | 203 +++++++++++++++++++++++++ tests/integration_tests/test_reaper.py | 141 +++++++++++++++++ 4 files changed, 364 insertions(+), 2 deletions(-) create mode 100644 tests/integration_tests/reaper.py create mode 100644 tests/integration_tests/test_reaper.py diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 03ecd3cdbae..5365b07a5a9 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -15,6 +15,7 @@ from pycloudlib.cloud import ImageType from pycloudlib.lxd.instance import LXDInstance +import tests.integration_tests.reaper as reaper from tests.integration_tests import integration_settings from tests.integration_tests.clouds import ( AzureCloud, @@ -75,6 +76,7 @@ def disable_subp_usage(request): _SESSION_CLOUD: IntegrationCloud +REAPER: reaper._Reaper @pytest.fixture(scope="session") @@ -472,9 +474,13 @@ def _generate_profile_report() -> None: def pytest_sessionstart(session) -> None: """do session setup""" global _SESSION_CLOUD + global REAPER + log.info("starting session") try: _SESSION_CLOUD = get_session_cloud() setup_image(_SESSION_CLOUD) + REAPER = reaper._Reaper() + REAPER.start() except Exception as e: if _SESSION_CLOUD: # if a _SESSION_CLOUD was allocated, clean it up @@ -485,10 +491,13 @@ def pytest_sessionstart(session) -> None: pytest.exit( f"{type(e).__name__} in session setup: {str(e)}", returncode=2 ) + log.info("started session") def pytest_sessionfinish(session, exitstatus) -> None: """do session teardown""" + global REAPER + log.info("finishing session") try: if integration_settings.INCLUDE_COVERAGE: _generate_coverage_report() @@ -504,9 +513,18 @@ def pytest_sessionfinish(session, exitstatus) -> None: _SESSION_CLOUD.snapshot_id, e, ) + try: + REAPER.stop() + except Exception as e: + log.warning( + "Could not tear down instance reaper thread: %s(%s)", + type(e).__name__, + e, + ) try: _SESSION_CLOUD.destroy() except Exception as e: log.warning( "Could not destroy session cloud: %s(%s)", type(e).__name__, e ) + log.info("finish session") diff --git a/tests/integration_tests/instances.py b/tests/integration_tests/instances.py index c4427506881..d745219e99b 100644 --- a/tests/integration_tests/instances.py +++ b/tests/integration_tests/instances.py @@ -14,7 +14,7 @@ from pycloudlib.result import Result from tests.helpers import cloud_init_project_dir -from tests.integration_tests import integration_settings +from tests.integration_tests import conftest, integration_settings from tests.integration_tests.decorators import retry from tests.integration_tests.util import ASSETS_DIR @@ -330,6 +330,6 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): if not self.settings.KEEP_INSTANCE: - self.destroy() + conftest.REAPER.reap(self) else: log.info("Keeping Instance, public ip: %s", self.ip()) diff --git a/tests/integration_tests/reaper.py b/tests/integration_tests/reaper.py new file mode 100644 index 00000000000..308dbbfe575 --- /dev/null +++ b/tests/integration_tests/reaper.py @@ -0,0 +1,203 @@ +"""Defines _Reaper, which destroys instances in a background thread + +This class is intended to be a singleton which is instantiated on session setup +and cleaned on session teardown. Any instances submitted to the reaper are +destroyed. Instances that refuse to be destroyed due to external library errors +or flaky infrastructure are tracked, retried and upon test session completion +are reported to the end user as a test warning. +""" + +from __future__ import annotations # required for Python 3.8 + +import logging +import queue +import threading +import warnings +from typing import Final, List, Optional + +from tests.integration_tests.instances import IntegrationInstance + +LOG = logging.getLogger() + + +class _Reaper: + def __init__(self, timeout: float = 30.0): + # self.timeout sets the amount of time to sleep before retrying + self.timeout = timeout + # self.wake_reaper tells the reaper to wake up. + # + # A lock is used for synchronization. This means that notify() will + # block if + # the reaper is currently awake. + # + # It is set by: + # - signal interrupt indicating cleanup + # - session completion indicating cleanup + # - reaped instance indicating work to be done + self.wake_reaper: Final[threading.Condition] = threading.Condition() + + # self.exit_reaper tells the reaper loop to tear down, called once at + # end of tests + self.exit_reaper: Final[threading.Event] = threading.Event() + + # List of instances which temporarily escaped death + # The primary porpose of the reaper is to coax these instance towards + # eventual demise and report their insubordination on shutdown. + self.undead_ledger: Final[List[IntegrationInstance]] = [] + + # Queue of newly reaped instances + self.reaped_instances: Final[queue.Queue[IntegrationInstance]] = ( + queue.Queue() + ) + + # Thread object, handle used to re-join the thread + self.reaper_thread: Optional[threading.Thread] = None + + # Count the dead + self.counter = 0 + + def reap(self, instance: IntegrationInstance): + """reap() submits an instance to the reaper thread. + + An instance that is passed to the reaper must not be used again. It may + not be dead yet, but it has no place among the living. + """ + LOG.info("Reaper: receiving %s", instance.instance.id) + + self.reaped_instances.put(instance) + with self.wake_reaper: + self.wake_reaper.notify() + LOG.info("Reaper: awakened to reap") + + def start(self): + """Spawn the reaper background thread.""" + LOG.info("Reaper: starting") + self.reaper_thread = threading.Thread( + target=self._reaper_loop, name="reaper" + ) + self.reaper_thread.start() + + def stop(self): + """Stop the reaper background thread and wait for completion.""" + LOG.info("Reaper: stopping") + self.exit_reaper.set() + with self.wake_reaper: + self.wake_reaper.notify() + LOG.info("Reaper: awakened to reap") + if self.reaper_thread and self.reaper_thread.is_alive(): + self.reaper_thread.join() + LOG.info("Reaper: stopped") + + def _destroy(self, instance: IntegrationInstance) -> bool: + """destroy() destroys an instance and returns True on success.""" + try: + LOG.info("Reaper: destroying %s", instance.instance.id) + instance.destroy() + self.counter += 1 + return True + except Exception as e: + LOG.warning( + "Error while tearing down instance %s: %s ", instance, e + ) + return False + + def _reaper_loop(self) -> None: + """reaper_loop() manages all instances that have been reaped + + tasks: + - destroy newly reaped instances + - manage a ledger undead instances + - periodically attempt to kill undead instances + - die when instructed to + - ensure that every reaped instance is destroyed at least once before + reaper dies + """ + LOG.info("Reaper: exalted in life, to assist others in death") + while True: + # nap until woken or timeout + with self.wake_reaper: + self.wake_reaper.wait(timeout=self.timeout) + if self._do_reap(): + break + LOG.info("Reaper: exited") + + def _do_reap(self) -> bool: + """_do_reap does a single pass of the reaper loop + + return True if the loop should exit + """ + + new_undead_instances: List[IntegrationInstance] = [] + + # first destroy all newly reaped instances + while not self.reaped_instances.empty(): + instance = self.reaped_instances.get_nowait() + success = self._destroy(instance) + if not success: + LOG.warning( + "Reaper: failed to destroy %s", + instance.instance.id, + ) + # failure to delete, add to the ledger + new_undead_instances.append(instance) + else: + LOG.info("Reaper: destroyed %s", instance.instance.id) + + # every instance has tried at least once and the reaper has been + # instructed to tear down - so do it + if self.exit_reaper.is_set(): + if not self.reaped_instances.empty(): + # race: an instance was added to the queue after iteration + # completed. Destroy the latest instance. + self._update_undead_ledger(new_undead_instances) + return False + self._update_undead_ledger(new_undead_instances) + LOG.info("Reaper: exiting") + if self.undead_ledger: + # undead instances exist - unclean teardown + LOG.info( + "Reaper: the faults of incompetent abilities will be " + "consigned to oblivion, as myself must soon be to the " + "mansions of rest." + ) + warnings.warn(f"Test instance(s) leaked: {self.undead_ledger}") + else: + LOG.info("Reaper: duties complete, my turn to rest") + LOG.info( + "Reaper: reaped %s/%s instances", + self.counter, + self.counter + len(self.undead_ledger), + ) + return True + + # attempt to destroy all instances which previously refused to destroy + for instance in self.undead_ledger: + if self.exit_reaper.is_set() and self.reaped_instances.empty(): + # don't retry instances if the exit_reaper Event is set + break + if self._destroy(instance): + self.undead_ledger.remove(instance) + LOG.info("Reaper: destroyed %s (undead)", instance.instance.id) + + self._update_undead_ledger(new_undead_instances) + return False + + def _update_undead_ledger( + self, new_undead_instances: List[IntegrationInstance] + ): + """update the ledger with newly undead instances""" + if new_undead_instances: + if self.undead_ledger: + LOG.info( + "Reaper: instance(s) not ready to die %s, will now join " + "the ranks of the undead: %s", + new_undead_instances, + self.undead_ledger, + ) + else: + LOG.info( + "Reaper: instance(s) not ready to die %s", + new_undead_instances, + ) + self.undead_ledger.extend(new_undead_instances) + return False diff --git a/tests/integration_tests/test_reaper.py b/tests/integration_tests/test_reaper.py new file mode 100644 index 00000000000..d34b2a191b9 --- /dev/null +++ b/tests/integration_tests/test_reaper.py @@ -0,0 +1,141 @@ +"""reaper self-test""" + +import logging +import time +import warnings +from unittest import mock + +import pytest + +from tests.integration_tests import reaper +from tests.integration_tests.instances import IntegrationInstance + +LOG = logging.Logger(__name__) + + +class MockInstance(IntegrationInstance): + # because of instance id printing + instance = mock.Mock() + + def __init__(self, times_refused): + self.times_refused = times_refused + self.call_count = 0 + + # assert that destruction succeeded + self.stopped = False + + def destroy(self): + """destroy() only succeeds after failing N=times_refused times""" + if self.call_count == self.times_refused: + self.stopped = True + return + self.call_count += 1 + raise RuntimeError("I object!") + + +@pytest.mark.ci +class TestReaper: + def test_start_stop(self): + """basic setup teardown""" + + instance = MockInstance(0) + r = reaper._Reaper() + # start / stop + r.start() + r.stop() + # start / reap / stop + r.start() + r.reap(instance) + r.stop() + + # start / stop + r.start() + r.stop() + assert instance.stopped + + def test_basic_reap(self): + """basic setup teardown""" + + i_1 = MockInstance(0) + r = reaper._Reaper() + r.start() + r.reap(i_1) + r.stop() + assert i_1.stopped + + def test_unreaped_instance(self): + """a single warning should print for any number of leaked instances""" + + i_1 = MockInstance(64) + i_2 = MockInstance(64) + r = reaper._Reaper() + r.start() + r.reap(i_1) + r.reap(i_2) + with warnings.catch_warnings(record=True) as w: + r.stop() + assert len(w) == 1 + + def test_stubborn_reap(self): + """verify that stubborn instances are cleaned""" + + sleep_time = 0.000_001 + sleep_total = 0.0 + instances = [ + MockInstance(0), + MockInstance(3), + MockInstance(6), + MockInstance(9), + MockInstance(12), + MockInstance(9), + MockInstance(6), + MockInstance(3), + MockInstance(0), + ] + + # forcibly disallow sleeping, to avoid wasted time during tests + r = reaper._Reaper(timeout=0.0) + r.start() + for i in instances: + r.reap(i) + + # this should really take no time at all, waiting 1s should be plenty + # of time for the reaper to reap it when not sleeping + while sleep_total < 1.0: + # are any still undead? + any_undead = False + for i in instances: + if not i.stopped: + any_undead = True + break + if not any_undead: + # test passed + # Advance to GO, collect $400 + break + # sleep then recheck, incremental backoff + sleep_total += sleep_time + sleep_time *= 2 + time.sleep(sleep_time) + r.stop() + for i in instances: + assert i.stopped, ( + f"Reaper didn't reap stubborn instance {i} in {sleep_total}s. " + "Something appears to be broken in the reaper logic or test." + ) + + def test_start_stop_multiple(self): + """reap lots of instances + + obedient ones + """ + num = 64 + instances = [] + r = reaper._Reaper() + r.start() + for _ in range(num): + i = MockInstance(0) + instances.append(i) + r.reap(i) + r.stop() + for i in instances: + assert i.stopped