Skip to content

Commit

Permalink
API-factory setup registered host method
Browse files Browse the repository at this point in the history
  • Loading branch information
damoore044 committed May 7, 2024
1 parent 1f85547 commit 20ce1a4
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 42 deletions.
211 changes: 211 additions & 0 deletions robottelo/host_helpers/api_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from contextlib import contextmanager
from datetime import datetime
import time
from unittest.mock import MagicMock

from fauxfactory import gen_ipaddr, gen_mac, gen_string
from nailgun import entity_mixins
from nailgun.client import request
from nailgun.entity_mixins import call_entity_method_with_timeout
import pytest
from requests import HTTPError

from robottelo.config import settings
Expand Down Expand Up @@ -711,6 +713,215 @@ def wait_for_errata_applicability_task(
f'No task was found using query " {search_query} " for host id: {host_id}'
)

def register_host_and_needed_setup(
self,
client,
organization,
activation_key,
environment,
content_view,
enable_repos=False,
rex_key=False,
force=False,
):
"""
Method will setup desired entities to host content. Then, register the
host client to the entities, using associated activation-key.
Attempt to make needed associations between detached entities.
Add desired repos to the content-view prior to calling this helper.
Or, add them to content-view after calling, then publish/promote.
param satellite: sat instance to create/associate needed entities.
param client: instance of a rhel contenthost.
param enable_repos (bool): enable all available repos on the client after registration?
default: False, be sure to enable repos after calling this method.
param rex_key (bool): add a Remote Execution Key to client for satellite?
default: False
param force (bool): force registration of the client to bypass?
default: False
type: Below arguments can be any of the following:
int: pass id of entity to be read
str: pass name of entity to be searched
entity: pass an entity instance
param organization: pass an Organization instance, name, or id to use.
param activation_key: pass an Activation-Key instance, name, or id.
param environment: pass a Lifecycle-Environment instance, name, or id.
for example: can pass string name 'Library'.
param content_view: pass a Content-View instance, name, or id.
for example: can pass string name 'Default Organization View'.
note:
1. Will fail if passed entities do not exist in the same organization.
2. The Default Organization View cannot be published, promoted, edited, or deleted.
but you can register the client to it.
3. This method will not enable any of the repositories for client,
do so after returning the registered host, ie:
client.execute(f'subscription-manager repos --enable {repo_id}')
or client.enable_repo({repo_name}) etc ...
Steps:
1. get needed entities from arguments, id, name, or instance. Read all as instance.
2. publish the content-view if no versions exist, or needs_publish.
3. promote the newest content-view-version if not in environment already. Skip for 'Library'.
4. assign environment and content-view to the activation-key, if not associated.
5. Register the host, using the activation-key associated with content.
return: dict containing the updated entities:
{
'client', 'organization', 'activation_key',
'environment', 'content_view',
}
"""
entities = {
'Organization': organization,
'ActivationKey': activation_key,
'LifecycleEnvironment': environment,
'ContentView': content_view,
}
assert hasattr(client, 'hostname')

# method for updating entities in params dict above,
# called after entity modifications.
def read_entities():
nonlocal entities
entities = {k: v.read() for k, v in entities.items()}

# for entity arguments matched to above params:
# fetch entity instance on satellite
# from given id or name, else read passed argument as an instance.
for entity, value in entities.items():
param = None
# passed int for entity, try to read by id
if isinstance(value, int):
param = getattr(self._satellite.api, entity)(id=value).read()
# passed str, search for entity by name
elif isinstance(value, str):
# search for org name itself, will be just scoped to satellite
if entity == 'Organization':
search_query = f'name="{value}"'
result = getattr(self._satellite.api, entity)().search(
query={'search': search_query}
)
# search of non-org entity by name, will be scoped to organization
else:
search_query = (
f"name='{value}' and organization_id={entities['Organization'].id}"
)
result = getattr(self._satellite.api, entity)(
organization=entities['Organization']
).search(query={'search': search_query})
if not len(result) > 0:
pytest.fail(
f'Could not find {entity} name: {value}, by search query: "{search_query}"'
)
param = result[0]
# did not pass int (id) or str (name), must be readable entity instance
else:
if not hasattr(value, 'id'):
pytest.fail(f'Passed entity {entity}, has no attribute id:\n{value}')
param = value
# updated param, should now be only an entity isntance
assert hasattr(param, 'id'), (
f'Did not get readable instance from parameter on {self._satellite.hostname}:'
f' Param:{entity}:\n{value}'
)
# entity found, read updated instance into dictionary
entities[entity] = param.read()

if ( # publish a content-view-version if none exist, or needs_publish is True
len(entities['ContentView'].version) == 0
or entities['ContentView'].needs_publish is True
):
entities['ContentView'].publish()
read_entities()

# skip for 'Library' env selected or passed arg,
# any published version(s) will already be in Library.
# promote to non-Library env if not already present:
if all(
[
environment != 'Library',
entities['LifecycleEnvironment'].name != 'Library',
entities['LifecycleEnvironment'] not in entities['ContentView'].environment,
]
):
# promote newest version by id
entities['ContentView'].version.sort(key=lambda version: version.id)
entities['ContentView'].version[-1].promote(
data={'environment_ids': entities['LifecycleEnvironment'].id}
)
read_entities()

if ( # assign env to ak if not present
entities['ActivationKey'].environment is None
or entities['ActivationKey'].environment.id != entities['LifecycleEnvironment'].id
):
entities['ActivationKey'].environment = entities['LifecycleEnvironment']
entities['ActivationKey'].update(['environment'])
read_entities()
if ( # assign cv to ak if not present
entities['ActivationKey'].content_view is None
or entities['ActivationKey'].content_view.id != entities['ContentView'].id
):
entities['ActivationKey'].content_view = entities['ContentView']
entities['ActivationKey'].update(['content_view'])

# register with now setup entities, using ak
read_entities()
result = client.register(
activation_keys=entities['ActivationKey'].name,
target=self._satellite,
org=entities['Organization'],
loc=None,
force=force,
)
if result.status != 0:
pytest.fail(f'Failed to register the host: {client.hostname}.\n{result.stderr}')
if not client.subscribed:
pytest.fail(
f"Failed to subscribe the host: {client.hostname}, to content-view: {entities['ContentView'].name}"
)
if rex_key:
client.add_rex_key(self._satellite)
# attempt to enable all repositories available to client,
# ie any repos added to content-view prior to calling this method.
# note: this will fail if no repos are available to client from CV/AK
if enable_repos:
output = client.execute(r'subscription-manager repos --enable \*')
assert output.status == 0, (
'Failed to enable all available repositories using subscription-manager.'
f' For client: {client.hostname}.\n{output.stdout}'
)

# For in-between parameterized sessions:
# unregister the host if it's still subscribed.
request = MagicMock()

@request.addfinalizer
def cleanup():
nonlocal client
if client.subscribed:
client.unregister()
assert (
client.subscribed is False
), f'Failed to fully teardown client {client.hostname}, maintains some content association.'

read_entities()
return ( # dict containing registered host client, and updated entities
{
'client': client,
'organization': entities['Organization'],
'activation_key': entities['ActivationKey'],
'environment': entities['LifecycleEnvironment'],
'content_view': entities['ContentView'],
}
)

def wait_for_syncplan_tasks(self, repo_backend_id=None, timeout=10, repo_name=None):
"""Search the pulp tasks and identify repositories sync tasks with
specified name or backend_identifier
Expand Down
Loading

0 comments on commit 20ce1a4

Please sign in to comment.