Skip to content

Commit

Permalink
Add SharedResource class to robottelo.utils (#12341)
Browse files Browse the repository at this point in the history
This class will be used to manage shared resources between multitple
processes or threads. The most common case will likely be a shared
upgrade satellite between multiple xdist workers.

I also handled an issues I encountered with the config_helpers script.

(cherry picked from commit 2d3cd63)
  • Loading branch information
JacobCallahan authored and web-flow committed Nov 14, 2023
1 parent b0e08fa commit fdb93e3
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 2 deletions.
199 changes: 199 additions & 0 deletions robottelo/utils/shared_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Allow multiple processes to communicate status on a single shared resource.
This is useful for cases where multiple processes need to wait for all other processes to be ready
before continuing with some common action. The most common use case in this framework will likely
be to wait for all pre-upgrade setups to be ready before performing the upgrade.
The system works by creating a file in /tmp with the name of the resource. This is a common file
where each process can communicate its status. The first process to register will be the main
watcher. The main watcher will wait for all other processes to be ready, then perform the action.
If the main actor fails to complete the action, and the action is recoverable, another process
will take over as the main watcher and attempt to perform the action. If the action is not
recoverable, the main watcher will fail and release all other processes.
It is recommended to use this class as a context manager, as it will automatically register and
report when the process is done.
Example:
>>> with SharedResource("target_sat.hostname", upgrade_action, **upgrade_kwargs) as resource:
... # Do pre-upgrade setup steps
... resource.ready() # tell the other processes that we are ready
... yield target_sat # give the upgraded satellite to the test
... # Do post-upgrade cleanup steps if any
"""
import json
from pathlib import Path
import time
from uuid import uuid4

from broker.helpers import FileLock


class SharedResource:
"""A class representing a shared resource.
Attributes:
action (function): The function to be executed when the resource is ready.
action_args (tuple): The arguments to be passed to the action function.
action_kwargs (dict): The keyword arguments to be passed to the action function.
action_is_recoverable (bool): Whether the action is recoverable or not.
id (str): The unique identifier of the shared resource.
resource_file (Path): The path to the file representing the shared resource.
is_main (bool): Whether the current instance is the main watcher or not.
is_recovering (bool): Whether the current instance is recovering from an error or not.
"""

def __init__(self, resource_name, action, *action_args, **action_kwargs):
"""Initializes a new instance of the SharedResource class.
Args:
resource_name (str): The name of the shared resource.
action (function): The function to be executed when the resource is ready.
action_args (tuple): The arguments to be passed to the action function.
action_kwargs (dict): The keyword arguments to be passed to the action function.
"""
self.resource_file = Path(f"/tmp/{resource_name}.shared")
self.lock_file = FileLock(self.resource_file)
self.id = str(uuid4().fields[-1])
self.action = action
self.action_is_recoverable = action_kwargs.pop("action_is_recoverable", False)
self.action_args = action_args
self.action_kwargs = action_kwargs
self.is_recovering = False

def _update_status(self, status):
"""Updates the status of the shared resource.
Args:
status (str): The new status of the shared resource.
"""
with self.lock_file:
curr_data = json.loads(self.resource_file.read_text())
curr_data["statuses"][self.id] = status
self.resource_file.write_text(json.dumps(curr_data, indent=4))

def _update_main_status(self, status):
"""Updates the main status of the shared resource.
Args:
status (str): The new main status of the shared resource.
"""
with self.lock_file:
curr_data = json.loads(self.resource_file.read_text())
curr_data["main_status"] = status
self.resource_file.write_text(json.dumps(curr_data, indent=4))

def _check_all_status(self, status):
"""Checks if all watchers have the specified status.
Args:
status (str): The status to check for.
Returns:
bool: True if all watchers have the specified status, False otherwise.
"""
with self.lock_file:
curr_data = json.loads(self.resource_file.read_text())
for watcher_id in curr_data["watchers"]:
if curr_data["statuses"].get(watcher_id) != status:
return False
return True

def _wait_for_status(self, status):
"""Waits until all watchers have the specified status.
Args:
status (str): The status to wait for.
"""
while not self._check_all_status(status):
time.sleep(1)

def _wait_for_main_watcher(self):
"""Waits for the main watcher to finish."""
while True:
curr_data = json.loads(self.resource_file.read_text())
if curr_data["main_status"] != "done":
time.sleep(60)
elif curr_data["main_status"] == "action_error":
self._try_take_over()
elif curr_data["main_status"] == "error":
raise Exception(f"Error in main watcher: {curr_data['main_watcher']}")
else:
break

def _try_take_over(self):
"""Tries to take over as the main watcher."""
with self.lock_file:
curr_data = json.loads(self.resource_file.read_text())
if curr_data["main_status"] in ("action_error", "error"):
curr_data["main_status"] = "recovering"
curr_data["main_watcher"] = self.id
self.resource_file.write_text(json.dumps(curr_data, indent=4))
self.is_main = True
self.is_recovering = True
self.wait()

def register(self):
"""Registers the current process as a watcher."""
with self.lock_file:
if self.resource_file.exists():
curr_data = json.loads(self.resource_file.read_text())
self.is_main = False
else: # First watcher to register, becomes the main watcher, and creates the file
curr_data = {
"watchers": [],
"statuses": {},
"main_watcher": self.id,
"main_status": "waiting",
}
self.is_main = True
curr_data["watchers"].append(self.id)
curr_data["statuses"][self.id] = "pending"
self.resource_file.write_text(json.dumps(curr_data, indent=4))

def ready(self):
"""Marks the current process as ready to perform the action."""
self._update_status("ready")
self.wait()

def done(self):
"""Marks the current process as done performing post actions."""
self._update_status("done")

def act(self):
"""Attempt to perform the action."""
try:
self.action(*self.action_args, **self.action_kwargs)
except Exception as err:
self._update_main_status("error")
raise err

def wait(self):
"""Top-level wait function, separating behavior between main and non-main watchers."""
if self.is_main and not (self.is_recovering and not self.action_is_recoverable):
self._wait_for_status("ready")
self._update_main_status("acting")
self.act()
self._update_main_status("done")
else:
self._wait_for_main_watcher()

def __enter__(self):
"""Registers the current process as a watcher and returns the instance."""
self.register()
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Marks the current process as done and updates the main watcher if needed."""
if exc_type is FileNotFoundError:
raise exc_value
if exc_type is None:
self.done()
if self.is_main:
self._wait_for_status("done")
self.resource_file.unlink()
else:
self._update_status("error")
if self.is_main:
self._update_main_status("error")
raise exc_value
5 changes: 3 additions & 2 deletions scripts/config_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def merge_nested_dictionaries(original, new, overwrite=False):
# if the key is not in the original, add it
if key not in original:
original[key] = value
# if the key is in the original, and the value is a dictionary, recurse
elif isinstance(value, dict):
# if the key is in the original, and original[key] and value are dictionaries, recurse
elif isinstance(original[key], dict) and isinstance(value, dict):
# use deepdiff to check if the dictionaries are the same
if deepdiff.DeepDiff(original[key], value):
original[key] = merge_nested_dictionaries(original[key], value, overwrite)
Expand All @@ -24,6 +24,7 @@ def merge_nested_dictionaries(original, new, overwrite=False):
# if the key is in the original, and the value is a list, ask the user
elif overwrite == "ask":
choice_prompt = (
"-------------------------\n"
f"The current value for {key} is {original[key]}.\n"
"Please choose an option:\n"
"1. Keep the current value\n"
Expand Down
61 changes: 61 additions & 0 deletions tests/robottelo/test_shared_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import multiprocessing
from pathlib import Path
import random
from threading import Thread
import time

from robottelo.utils.shared_resource import SharedResource


def upgrade_action(*args, **kwargs):
print(f"Upgrading satellite with {args=} and {kwargs=}")
time.sleep(1)
print("Satellite upgraded!")


def run_resource(resource_name):
time.sleep(random.random() * 5) # simulate random pre-setup
with SharedResource(resource_name, upgrade_action) as resource:
assert Path(f"/tmp/{resource_name}.shared").exists()
time.sleep(5) # simulate setup actions
resource.ready()
time.sleep(1) # simulate cleanup actions


def test_shared_resource():
"""Test the SharedResource class."""
with SharedResource("test_resource", upgrade_action, 1, 2, 3, foo="bar") as resource:
assert Path("/tmp/test_resource.shared").exists()
assert resource.is_main
assert not resource.is_recovering
assert resource.action == upgrade_action
assert resource.action_args == (1, 2, 3)
assert resource.action_kwargs == {"foo": "bar"}
assert not resource.action_is_recoverable

resource.ready()
assert resource._check_all_status("ready")

assert not Path("/tmp/test_resource.shared").exists()


def test_shared_resource_multiprocessing():
"""Test the SharedResource class with multiprocessing."""
with multiprocessing.Pool(2) as pool:
pool.map(run_resource, ["test_resource_mp", "test_resource_mp"])

assert not Path("/tmp/test_resource_mp.shared").exists()


def test_shared_resource_multithreading():
"""Test the SharedResource class with multithreading."""
t1 = Thread(target=run_resource, args=("test_resource_th",))
t2 = Thread(target=run_resource, args=("test_resource_th",))

t1.start()
t2.start()

t1.join()
t2.join()

assert not Path("/tmp/test_resource_th.shared").exists()

0 comments on commit fdb93e3

Please sign in to comment.