Skip to content

Commit

Permalink
test: disambiguate resource cleanup from test failure (#5926)
Browse files Browse the repository at this point in the history
Add a background thread which is responsible for instance cleanup in integration tests.
  • Loading branch information
holmanb authored Jan 7, 2025
1 parent 4ba9021 commit 9bac08a
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 2 deletions.
18 changes: 18 additions & 0 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pycloudlib.cloud import ImageType
from pycloudlib.lxd.instance import LXDInstance

import tests.integration_tests.reaper as reaper
from tests.integration_tests import integration_settings
from tests.integration_tests.clouds import (
AzureCloud,
Expand Down Expand Up @@ -75,6 +76,7 @@ def disable_subp_usage(request):


_SESSION_CLOUD: IntegrationCloud
REAPER: reaper._Reaper


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -472,9 +474,13 @@ def _generate_profile_report() -> None:
def pytest_sessionstart(session) -> None:
"""do session setup"""
global _SESSION_CLOUD
global REAPER
log.info("starting session")
try:
_SESSION_CLOUD = get_session_cloud()
setup_image(_SESSION_CLOUD)
REAPER = reaper._Reaper()
REAPER.start()
except Exception as e:
if _SESSION_CLOUD:
# if a _SESSION_CLOUD was allocated, clean it up
Expand All @@ -485,10 +491,13 @@ def pytest_sessionstart(session) -> None:
pytest.exit(
f"{type(e).__name__} in session setup: {str(e)}", returncode=2
)
log.info("started session")


def pytest_sessionfinish(session, exitstatus) -> None:
"""do session teardown"""
global REAPER
log.info("finishing session")
try:
if integration_settings.INCLUDE_COVERAGE:
_generate_coverage_report()
Expand All @@ -504,9 +513,18 @@ def pytest_sessionfinish(session, exitstatus) -> None:
_SESSION_CLOUD.snapshot_id,
e,
)
try:
REAPER.stop()
except Exception as e:
log.warning(
"Could not tear down instance reaper thread: %s(%s)",
type(e).__name__,
e,
)
try:
_SESSION_CLOUD.destroy()
except Exception as e:
log.warning(
"Could not destroy session cloud: %s(%s)", type(e).__name__, e
)
log.info("finish session")
4 changes: 2 additions & 2 deletions tests/integration_tests/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pycloudlib.result import Result

from tests.helpers import cloud_init_project_dir
from tests.integration_tests import integration_settings
from tests.integration_tests import conftest, integration_settings
from tests.integration_tests.decorators import retry
from tests.integration_tests.util import ASSETS_DIR

Expand Down Expand Up @@ -330,6 +330,6 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
if not self.settings.KEEP_INSTANCE:
self.destroy()
conftest.REAPER.reap(self)
else:
log.info("Keeping Instance, public ip: %s", self.ip())
203 changes: 203 additions & 0 deletions tests/integration_tests/reaper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Defines _Reaper, which destroys instances in a background thread
This class is intended to be a singleton which is instantiated on session setup
and cleaned on session teardown. Any instances submitted to the reaper are
destroyed. Instances that refuse to be destroyed due to external library errors
or flaky infrastructure are tracked, retried and upon test session completion
are reported to the end user as a test warning.
"""

from __future__ import annotations # required for Python 3.8

import logging
import queue
import threading
import warnings
from typing import Final, List, Optional

from tests.integration_tests.instances import IntegrationInstance

LOG = logging.getLogger()


class _Reaper:
def __init__(self, timeout: float = 30.0):
# self.timeout sets the amount of time to sleep before retrying
self.timeout = timeout
# self.wake_reaper tells the reaper to wake up.
#
# A lock is used for synchronization. This means that notify() will
# block if
# the reaper is currently awake.
#
# It is set by:
# - signal interrupt indicating cleanup
# - session completion indicating cleanup
# - reaped instance indicating work to be done
self.wake_reaper: Final[threading.Condition] = threading.Condition()

# self.exit_reaper tells the reaper loop to tear down, called once at
# end of tests
self.exit_reaper: Final[threading.Event] = threading.Event()

# List of instances which temporarily escaped death
# The primary porpose of the reaper is to coax these instance towards
# eventual demise and report their insubordination on shutdown.
self.undead_ledger: Final[List[IntegrationInstance]] = []

# Queue of newly reaped instances
self.reaped_instances: Final[queue.Queue[IntegrationInstance]] = (
queue.Queue()
)

# Thread object, handle used to re-join the thread
self.reaper_thread: Optional[threading.Thread] = None

# Count the dead
self.counter = 0

def reap(self, instance: IntegrationInstance):
"""reap() submits an instance to the reaper thread.
An instance that is passed to the reaper must not be used again. It may
not be dead yet, but it has no place among the living.
"""
LOG.info("Reaper: receiving %s", instance.instance.id)

self.reaped_instances.put(instance)
with self.wake_reaper:
self.wake_reaper.notify()
LOG.info("Reaper: awakened to reap")

def start(self):
"""Spawn the reaper background thread."""
LOG.info("Reaper: starting")
self.reaper_thread = threading.Thread(
target=self._reaper_loop, name="reaper"
)
self.reaper_thread.start()

def stop(self):
"""Stop the reaper background thread and wait for completion."""
LOG.info("Reaper: stopping")
self.exit_reaper.set()
with self.wake_reaper:
self.wake_reaper.notify()
LOG.info("Reaper: awakened to reap")
if self.reaper_thread and self.reaper_thread.is_alive():
self.reaper_thread.join()
LOG.info("Reaper: stopped")

def _destroy(self, instance: IntegrationInstance) -> bool:
"""destroy() destroys an instance and returns True on success."""
try:
LOG.info("Reaper: destroying %s", instance.instance.id)
instance.destroy()
self.counter += 1
return True
except Exception as e:
LOG.warning(
"Error while tearing down instance %s: %s ", instance, e
)
return False

def _reaper_loop(self) -> None:
"""reaper_loop() manages all instances that have been reaped
tasks:
- destroy newly reaped instances
- manage a ledger undead instances
- periodically attempt to kill undead instances
- die when instructed to
- ensure that every reaped instance is destroyed at least once before
reaper dies
"""
LOG.info("Reaper: exalted in life, to assist others in death")
while True:
# nap until woken or timeout
with self.wake_reaper:
self.wake_reaper.wait(timeout=self.timeout)
if self._do_reap():
break
LOG.info("Reaper: exited")

def _do_reap(self) -> bool:
"""_do_reap does a single pass of the reaper loop
return True if the loop should exit
"""

new_undead_instances: List[IntegrationInstance] = []

# first destroy all newly reaped instances
while not self.reaped_instances.empty():
instance = self.reaped_instances.get_nowait()
success = self._destroy(instance)
if not success:
LOG.warning(
"Reaper: failed to destroy %s",
instance.instance.id,
)
# failure to delete, add to the ledger
new_undead_instances.append(instance)
else:
LOG.info("Reaper: destroyed %s", instance.instance.id)

# every instance has tried at least once and the reaper has been
# instructed to tear down - so do it
if self.exit_reaper.is_set():
if not self.reaped_instances.empty():
# race: an instance was added to the queue after iteration
# completed. Destroy the latest instance.
self._update_undead_ledger(new_undead_instances)
return False
self._update_undead_ledger(new_undead_instances)
LOG.info("Reaper: exiting")
if self.undead_ledger:
# undead instances exist - unclean teardown
LOG.info(
"Reaper: the faults of incompetent abilities will be "
"consigned to oblivion, as myself must soon be to the "
"mansions of rest."
)
warnings.warn(f"Test instance(s) leaked: {self.undead_ledger}")
else:
LOG.info("Reaper: duties complete, my turn to rest")
LOG.info(
"Reaper: reaped %s/%s instances",
self.counter,
self.counter + len(self.undead_ledger),
)
return True

# attempt to destroy all instances which previously refused to destroy
for instance in self.undead_ledger:
if self.exit_reaper.is_set() and self.reaped_instances.empty():
# don't retry instances if the exit_reaper Event is set
break
if self._destroy(instance):
self.undead_ledger.remove(instance)
LOG.info("Reaper: destroyed %s (undead)", instance.instance.id)

self._update_undead_ledger(new_undead_instances)
return False

def _update_undead_ledger(
self, new_undead_instances: List[IntegrationInstance]
):
"""update the ledger with newly undead instances"""
if new_undead_instances:
if self.undead_ledger:
LOG.info(
"Reaper: instance(s) not ready to die %s, will now join "
"the ranks of the undead: %s",
new_undead_instances,
self.undead_ledger,
)
else:
LOG.info(
"Reaper: instance(s) not ready to die %s",
new_undead_instances,
)
self.undead_ledger.extend(new_undead_instances)
return False
Loading

0 comments on commit 9bac08a

Please sign in to comment.