Skip to content

Commit

Permalink
API-factory setup registered host method
Browse files Browse the repository at this point in the history
  • Loading branch information
damoore044 committed May 7, 2024
1 parent 1f85547 commit 8ac5e8f
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 42 deletions.
209 changes: 209 additions & 0 deletions robottelo/host_helpers/api_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from contextlib import contextmanager
from datetime import datetime
import time
from unittest.mock import MagicMock

from fauxfactory import gen_ipaddr, gen_mac, gen_string
from nailgun import entity_mixins
from nailgun.client import request
from nailgun.entity_mixins import call_entity_method_with_timeout
import pytest
from requests import HTTPError

from robottelo.config import settings
Expand Down Expand Up @@ -711,6 +713,213 @@ def wait_for_errata_applicability_task(
f'No task was found using query " {search_query} " for host id: {host_id}'
)

def register_host_and_needed_setup(
self,
client,
organization,
activation_key,
environment,
content_view,
enable_repos=False,
rex_key=False,
force=False,
):
"""
Method will setup desired entities to host content. Then, register the
host client to the entities, using associated activation-key.
Attempt to make needed associations between detached entities.
Add desired repos to the content-view prior to calling this helper.
Or, add them to content-view after calling, then publish/promote.
param satellite: sat instance to create/associate needed entities.
param client: instance of a rhel contenthost.
param enable_repos (bool): enable all available repos on the client after registration?
default: False, be sure to enable repos after calling this method.
param rex_key (bool): add a Remote Execution Key to client for satellite?
default: False
param force (bool): force registration of the client to bypass?
default: False
type: Below arguments can be any of the following:
int: pass id of entity to be read
str: pass name of entity to be searched
entity: pass an entity instance
param organization: pass an Organization instance, name, or id to use.
param activation_key: pass an Activation-Key instance, name, or id.
param environment: pass a Lifecycle-Environment instance, name, or id.
for example: can pass string name 'Library'.
param content_view: pass a Content-View instance, name, or id.
for example: can pass string name 'Default Organization View'.
Notes:
1. Will fail if passed entities do not exist in the same organization.
2. You can use param enable_repos, to try enabling any repositories added to passed content-view for client,
but if there are no available repositories from content-view, it will fail.
3. The Default Organization View cannot be published, promoted, edited, or deleted.
but you can register the client to it.
Steps:
1. get needed entities from arguments, id, name, or instance. Read all as instance.
2. publish the content-view if no versions exist, or needs_publish.
3. promote the newest content-view-version if not in environment already. Skip for 'Library'.
4. assign environment and content-view to the activation-key, if not associated.
5. Register the host, using the activation-key associated with content.
return: dict containing the updated entities:
{
'client', 'organization', 'activation_key',
'environment', 'content_view',
}
"""
entities = {
'Organization': organization,
'ActivationKey': activation_key,
'LifecycleEnvironment': environment,
'ContentView': content_view,
}
assert hasattr(client, 'hostname')

# method for updating entities in params dict above,
# called after entity modifications.
def read_entities():
nonlocal entities
entities = {k: v.read() for k, v in entities.items()}

# for entity arguments matched to above params:
# fetch entity instance on satellite
# from given id or name, else read passed argument as an instance.
for entity, value in entities.items():
param = None
# passed int for entity, try to read by id
if isinstance(value, int):
param = getattr(self._satellite.api, entity)(id=value).read()
# passed str, search for entity by name
elif isinstance(value, str):
# search for org name itself, will be just scoped to satellite
if entity == 'Organization':
search_query = f'name="{value}"'
result = getattr(self._satellite.api, entity)().search(
query={'search': search_query}
)
# search of non-org entity by name, will be scoped to organization
else:
search_query = (
f"name='{value}' and organization_id={entities['Organization'].id}"
)
result = getattr(self._satellite.api, entity)(
organization=entities['Organization']
).search(query={'search': search_query})
if not len(result) > 0:
pytest.fail(
f'Could not find {entity} name: {value}, by search query: "{search_query}"'
)
param = result[0]
# did not pass int (id) or str (name), must be readable entity instance
else:
if not hasattr(value, 'id'):
pytest.fail(f'Passed entity {entity}, has no attribute id:\n{value}')
param = value
# updated param, should now be only an entity isntance
assert hasattr(param, 'id'), (
f'Did not get readable instance from parameter on {self._satellite.hostname}:'
f' Param:{entity}:\n{value}'
)
# entity found, read updated instance into dictionary
entities[entity] = param.read()

if ( # publish a content-view-version if none exist, or needs_publish is True
len(entities['ContentView'].version) == 0
or entities['ContentView'].needs_publish is True
):
entities['ContentView'].publish()
read_entities()

# promote to non-Library env if not already present:
# skip for 'Library' env selected or passed arg,
# any published version(s) will already be in Library.
if all(
[
environment != 'Library',
entities['LifecycleEnvironment'].name != 'Library',
entities['LifecycleEnvironment'] not in entities['ContentView'].environment,
]
):
# promote newest version by id
entities['ContentView'].version.sort(key=lambda version: version.id)
entities['ContentView'].version[-1].promote(
data={'environment_ids': entities['LifecycleEnvironment'].id}
)
read_entities()

if ( # assign env to ak if not present
entities['ActivationKey'].environment is None
or entities['ActivationKey'].environment.id != entities['LifecycleEnvironment'].id
):
entities['ActivationKey'].environment = entities['LifecycleEnvironment']
entities['ActivationKey'].update(['environment'])
read_entities()
if ( # assign cv to ak if not present
entities['ActivationKey'].content_view is None
or entities['ActivationKey'].content_view.id != entities['ContentView'].id
):
entities['ActivationKey'].content_view = entities['ContentView']
entities['ActivationKey'].update(['content_view'])

# register with now setup entities, using ak
read_entities()
result = client.register(
activation_keys=entities['ActivationKey'].name,
target=self._satellite,
org=entities['Organization'],
loc=None,
force=force,
)
if result.status != 0:
pytest.fail(f'Failed to register the host: {client.hostname}.\n{result.stderr}')
if not client.subscribed:
pytest.fail(
f"Failed to subscribe the host: {client.hostname}, to content-view: {entities['ContentView'].name}"
)
if rex_key:
client.add_rex_key(self._satellite)
# attempt to enable all repositories available to client,
# ie any repos added to content-view prior to calling this method.
# note: this will fail if no repos are available to client from CV/AK
if enable_repos:
output = client.execute(r'subscription-manager repos --enable \*')
assert output.status == 0, (
'Failed to enable all available repositories using subscription-manager.'
f' For client: {client.hostname}.\n{output.stdout}'
)

# For in-between parameterized sessions:
# unregister the host if it's still subscribed.
request = MagicMock()

@request.addfinalizer
def cleanup():
nonlocal client
if client.subscribed:
client.unregister()
assert (
client.subscribed is False
), f'Failed to fully teardown client {client.hostname}, maintains some content association.'

read_entities()
return ( # dict containing registered host client, and updated entities
{
'client': client,
'organization': entities['Organization'],
'activation_key': entities['ActivationKey'],
'environment': entities['LifecycleEnvironment'],
'content_view': entities['ContentView'],
}
)

def wait_for_syncplan_tasks(self, repo_backend_id=None, timeout=10, repo_name=None):
"""Search the pulp tasks and identify repositories sync tasks with
specified name or backend_identifier
Expand Down
Loading

0 comments on commit 8ac5e8f

Please sign in to comment.