-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SCAP report and remediation #1441
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
from wait_for import wait_for | ||
|
||
from airgun.entities.base import BaseEntity | ||
from airgun.navigation import NavigateStep, navigator | ||
from airgun.utils import retry_navigation | ||
from airgun.views.oscapreport import ( | ||
RemediateModal, | ||
SCAPReportDetailsView, | ||
SCAPReportView, | ||
) | ||
|
||
|
||
class OSCAPReportEntity(BaseEntity): | ||
endpoint_path = '/compliance/arf_reports' | ||
|
||
def search(self, search_string): | ||
"""Search for SCAP Report | ||
:param search_string: how to find the SCAP Report | ||
:return: result of the SCAP Report search | ||
""" | ||
view = self.navigate_to(self, 'All') | ||
return view.search(search_string) | ||
|
||
def details(self, search_string, widget_names=None, limit=None): | ||
"""Read the content from corresponding SCAP Report dashboard, | ||
clicking on the link in Reported At column of | ||
SCAP Report list | ||
:param search_string: | ||
:param limit: how many rules results to fetch at most | ||
:return: list of dictionaries with values from SCAP Report Details View | ||
""" | ||
view = self.navigate_to(self, 'Details', search_string=search_string) | ||
return view.read(widget_names=widget_names, limit=limit) | ||
|
||
def remediate(self, search_string, resource): | ||
"""Remediate the failed rule using automatic remediation through Ansible | ||
:param search_string: | ||
""" | ||
view = self.navigate_to(self, 'Details', search_string=search_string) | ||
view.table.row(resource=resource).actions.fill('Remediation') | ||
view = RemediateModal(self.browser) | ||
view.wait_displayed() | ||
self.browser.plugin.ensure_page_safe() | ||
wait_for(lambda: view.title.is_displayed, timeout=10, delay=1) | ||
view.fill({'select_remediation_method.snippet': 'Ansible'}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's what this function does as per docstring: |
||
view.select_capsule.run.click() | ||
|
||
|
||
@navigator.register(OSCAPReportEntity, 'All') | ||
class ShowAllSCAPReports(NavigateStep): | ||
"""Navigate to Compliance Reports screen.""" | ||
|
||
VIEW = SCAPReportView | ||
|
||
@retry_navigation | ||
def step(self, *args, **kwargs): | ||
self.view.menu.select('Hosts', 'Compliance', 'Reports') | ||
|
||
|
||
@navigator.register(OSCAPReportEntity, 'Details') | ||
class DetailsSCAPReport(NavigateStep): | ||
"""To get data from ARF report view | ||
Args: | ||
search_string: what to fill to find the SCAP report | ||
""" | ||
|
||
VIEW = SCAPReportDetailsView | ||
|
||
def prerequisite(self, *args, **kwargs): | ||
return self.navigate_to(self.obj, 'All') | ||
|
||
def step(self, *args, **kwargs): | ||
search_string = kwargs.get('search_string') | ||
self.parent.search(search_string) | ||
self.parent.table.row()['Reported At'].widget.click() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,21 +56,28 @@ def select_logout(self): | |
self.account_menu.click() | ||
self.logout.click() | ||
|
||
def read(self, widget_names=None): | ||
def read(self, widget_names=None, limit=None): | ||
lhellebr marked this conversation as resolved.
Show resolved
Hide resolved
lhellebr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Reads the contents of the view and presents them as a dictionary. | ||
|
||
:param widget_names: If specified, will read only the widgets names in the list. | ||
:param limit: how many entries to fetch at most | ||
|
||
:return: A :py:class:`dict` of ``widget_name: widget_read_value`` | ||
where the values are retrieved using the :py:meth:`Widget.read`. | ||
""" | ||
if widget_names is None: | ||
if limit is not None: | ||
raise NotImplementedError("You must specify widgets to be able to specify limit") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/widgets/widget_names is that right ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well... you specify widgets by their names |
||
return super().read() | ||
if not isinstance(widget_names, list | tuple): | ||
widget_names = [widget_names] | ||
values = {} | ||
for widget_name in widget_names: | ||
values[widget_name] = get_widget_by_name(self, widget_name).read() | ||
widget = get_widget_by_name(self, widget_name) | ||
if hasattr(widget, 'read_limited') and callable(widget.read_limited): | ||
values[widget_name] = widget.read(limit=limit) | ||
else: | ||
values[widget_name] = widget.read() | ||
return normalize_dict_values(values) | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
from widgetastic.widget import Text, View | ||
from widgetastic_patternfly4 import Button | ||
from widgetastic_patternfly4.ouia import FormSelect | ||
|
||
from airgun.views.common import BaseLoggedInView, SearchableViewMixin, WizardStepView | ||
from airgun.widgets import ( | ||
ActionsDropdown, | ||
SatTable, | ||
) | ||
|
||
|
||
class SCAPReportView(BaseLoggedInView, SearchableViewMixin): | ||
title = Text("//h1[normalize-space(.)='Compliance Reports']") | ||
table = SatTable( | ||
'.//table', | ||
column_widgets={ | ||
'Host': Text(".//a[contains(@href,'/new/hosts')]"), | ||
'Reported At': Text(".//a[contains(@href,'/compliance/arf_reports')]"), | ||
'Policy': Text(".//a[contains(@href,'/compliance/policies')]"), | ||
'Openscap Capsule': Text(".//a[contains(@href,'/smart_proxies')]"), | ||
'Passed': Text(".//span[contains(@class,'label-info')]"), | ||
'Failed': Text(".//span[contains(@class,'label-danger')]"), | ||
'Other': Text(".//span[contains(@class,'label-warning')]"), | ||
'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"), | ||
}, | ||
) | ||
|
||
@property | ||
def is_displayed(self): | ||
return self.browser.wait_for_element(self.title, exception=False) is not None | ||
|
||
|
||
class SCAPReportDetailsView(BaseLoggedInView): | ||
show_log_messages_label = Text('//span[normalize-space(.)="Show log messages:"]') | ||
table = SatTable( | ||
'.//table', | ||
column_widgets={ | ||
'Result': Text('./span[1]'), | ||
'Message': Text('./span[2]'), | ||
'Resource': Text('./span[3]'), | ||
'Severity': Text('./img[1]'), | ||
'Actions': ActionsDropdown("./div[contains(@class, 'btn-group')]"), | ||
}, | ||
) | ||
|
||
@property | ||
def is_displayed(self): | ||
return ( | ||
self.browser.wait_for_element(self.show_log_messages_label, exception=False) is not None | ||
) | ||
|
||
|
||
class RemediateModal(View): | ||
""" | ||
Class representing the "Remediate" modal. | ||
It contains multiple nested classes each representing a step of the wizard. | ||
""" | ||
|
||
ROOT = '//div[contains(@data-ouia-component-id, "OUIA-Generated-Modal-large-")]' | ||
|
||
title = Text('.//h2[contains(@class, "pf-c-title")]') | ||
close_modal = Button(locator='.//button[@aria-label="Close"]') | ||
|
||
@View.nested | ||
class select_remediation_method(WizardStepView): | ||
expander = Text( | ||
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Select snippet")]' | ||
) | ||
snippet = FormSelect('snippet-select') | ||
|
||
@View.nested | ||
class name_source(WizardStepView): | ||
expander = Text( | ||
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review hosts")]' | ||
) | ||
host_table = SatTable(".//table") | ||
|
||
@View.nested | ||
class select_capsule(WizardStepView): | ||
expander = Text( | ||
'.//button[contains(@class,"pf-c-wizard__nav-link") and contains(.,"Review remediation")]' | ||
) | ||
run = Button(locator='.//button[normalize-space(.)="Run"]') |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1929,11 +1929,56 @@ def has_rows(self): | |
return False | ||
return True | ||
|
||
def read(self): | ||
def read_limited(self, limit): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lhellebr nice addition! |
||
"""This is almost the same as inherited read but has a limit. Use it for tables that take too long to read. | ||
Reads the table. Returns a list, every item in the list is contents read from the row.""" | ||
rows = list(self) | ||
# Cut the unwanted rows if necessary | ||
if self.rows_ignore_top is not None: | ||
rows = rows[self.rows_ignore_top :] | ||
if self.rows_ignore_bottom is not None and self.rows_ignore_bottom > 0: | ||
rows = rows[: -self.rows_ignore_bottom] | ||
if self.assoc_column_position is None: | ||
ret = [] | ||
rows_read = 0 | ||
for row in rows: | ||
if rows_read >= limit: | ||
break | ||
ret.append(row.read()) | ||
rows_read = rows_read + 1 | ||
return ret | ||
else: | ||
result = {} | ||
rows_read = 0 | ||
for row in rows: | ||
if rows_read >= limit: | ||
break | ||
row_read = row.read() | ||
try: | ||
key = row_read.pop(self.header_index_mapping[self.assoc_column_position]) | ||
except KeyError: | ||
try: | ||
key = row_read.pop(self.assoc_column_position) | ||
except KeyError: | ||
try: | ||
key = row_read.pop(self.assoc_column) | ||
except KeyError as e: | ||
raise ValueError( | ||
f"The assoc_column={self.assoc_column!r} could not be retrieved" | ||
) from e | ||
if key in result: | ||
raise ValueError(f"Duplicate value for {key}={result[key]!r}") | ||
result[key] = row_read | ||
rows_read = rows_read + 1 | ||
return result | ||
|
||
def read(self, limit=None): | ||
"""Return empty list in case table is empty""" | ||
if not self.has_rows: | ||
self.logger.debug(f'Table {self.locator} is empty') | ||
return [] | ||
if limit is not None: | ||
return self.read_limited(limit) | ||
if self.pagination.is_displayed: | ||
return self._read_all() | ||
return super().read() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this search string ? is this any type of entity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately a link to the correct arf report is in the column
Reported At
and you get there by clicking the link saying e.g.1 day ago
... so search actually searches by a (not visible) id in the link.You call it like this: