diff --git a/src/ploigos_step_runner/results/workflow_result.py b/src/ploigos_step_runner/results/workflow_result.py
index 4109ead9a..3375fe741 100644
--- a/src/ploigos_step_runner/results/workflow_result.py
+++ b/src/ploigos_step_runner/results/workflow_result.py
@@ -35,8 +35,8 @@ def get_artifact_value(
): # pylint: disable=too-many-boolean-expressions
"""Search for an artifact.
- If step_name, sub_step_name, or environment are provided ensure the artifact comes
- from the first
+ If step_name, sub_step_name, or environment are not provided ensure the artifact comes
+ from the last step that returned that artifact.
1. if step_name is provided, look for the artifact in the step
2. elif step_name and sub_step_name is provided, look for the artifact in the step/sub_step
diff --git a/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py b/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py
index b5e534237..efea4d121 100644
--- a/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py
+++ b/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py
@@ -14,16 +14,15 @@
Configuration Key | Required? | Default | Description
-------------------------------|-----------|---------|-----------
-`image-tar-file` | Yes | | Path to container image tar file to scan
+`container-image-tag` | Yes | | Container image tag to scan.
`oscap-input-definitions-uri` | Yes | | URI to the OpenSCAP definitions file \
to do the evaluation with. \
- Must use protocol file://|http://|https://.
- | | | Must have file extension .xml|.bz2.
-`oscap-profile` | Yes | | OpenSCAP profile to evaluate.
+ Must use protocol file://|http://|https://. \
+ Must have file extension .xml|.bz2.
+`oscap-profile` | No | | OpenSCAP profile to evaluate.
`oscap-tailoring-uri` | No | | URI to OpenSCAP tailoring file \
to do the evaluation with. \
- Must use protocol \
- file://|http://|https://. \
+ Must use protocol file://|http://|https://. \
Must have file extension .xml|.bz2.
`oscap-fetch-remote-resources` | No | True | For Source DataStream and XCCDF files \
that have remote references fetch them if \
@@ -34,6 +33,12 @@
remote resources and this is not True. \
For disconnected environments the remote \
internal mirror.
+`[container-image-pull-repository-type, container-image-repository-type]` \
+ | Yes | 'containers-storage:' \
+ | \
+ Container repository type for the pull image source. \
+ See https://github.com/containers/skopeo for valid \
+ options.
Result Artifacts
----------------
diff --git a/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py b/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py
index e45496753..f46d847ed 100644
--- a/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py
+++ b/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py
@@ -13,17 +13,16 @@
* previous step results
Configuration Key | Required? | Default | Description
--------------------------------|-----------|---------|------------
-`image-tar-file` | Yes | | Path to container image tar file to scan
+-------------------------------|-----------|---------|-----------
+`container-image-tag` | Yes | | Container image tag to scan.
`oscap-input-definitions-uri` | Yes | | URI to the OpenSCAP definitions file \
to do the evaluation with. \
- Must use protocol file://|http://|https://.
- | | | Must have file extension .xml|.bz2.
+ Must use protocol file://|http://|https://. \
+ Must have file extension .xml|.bz2.
`oscap-profile` | No | | OpenSCAP profile to evaluate.
`oscap-tailoring-uri` | No | | URI to OpenSCAP tailoring file \
to do the evaluation with. \
- Must use protocol \
- file://|http://|https://. \
+ Must use protocol file://|http://|https://. \
Must have file extension .xml|.bz2.
`oscap-fetch-remote-resources` | No | True | For Source DataStream and XCCDF files \
that have remote references fetch them if \
@@ -34,6 +33,12 @@
remote resources and this is not True. \
For disconnected environments the remote \
internal mirror.
+`[container-image-pull-repository-type, container-image-repository-type]` \
+ | Yes | 'containers-storage:' \
+ | \
+ Container repository type for the pull image source. \
+ See https://github.com/containers/skopeo for valid \
+ options.
Result Artifacts
----------------
diff --git a/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py b/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py
index 827804a80..a1b31dd73 100644
--- a/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py
+++ b/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py
@@ -9,41 +9,64 @@
* runtime configuration
* previous step results
-Configuration Key | Required? | Default | Description
-------------------|-----------|---------|-----------
-`imagespecfile` | True | `'Containerfile'` \
- | File defining the container image
-`context` | True | `'.'` | Context to build the container image in
-`tls-verify` | True | `True` | Whether to verify TLS when pulling parent images
-`format` | True | `'oci'` | format of the built image's manifest and metadata
+Configuration Key | Required? | Default | Description
+-------------------|-----------|---------|-----------
+`imagespecfile` | True | `'Containerfile'` \
+ | File defining the container image
+`context` | True | `'.'` | Context to build the container image in
+`tls-verify` | True | `True` | Whether to verify TLS when pulling parent images
+`format` | True | `'oci'` | format of the built image's manifest and metadata
`containers-config-auth-file` \
- | True | `'~/.buildah-auth.json'` \
- | Path to the container registry authentication \
- file to use for container registry authentication.
+ | False | | Path to the container registry authentication \
+ file to use for container registry authentication. \
+ If one is not provided one will be created in the \
+ working directory.
`container-image-version` \
- | True | | Version to use when building the container image
+ | True | | Version to use when building the container image
+`organization` | True | | Used in built container image tag
+`application_name` | True | | Used in built container image tag
+`service_name` | True | | Used in built container image tag
+`container-registries` \
+ | False | | Hash of container registries to authenticate with.
Result Artifacts
----------------
Results artifacts output by this step.
-Result Artifact Key | Description
---------------------------|------------
-`container-image-version` | Container version to tag built image with
-`image-tar-file` | Path to the built container image as a tar file
-"""
+Result Artifact Key | Description
+-------------------------------|------------
+`container-image-registry-uri` | Registry URI poriton of the container image tag \
+ of the built container image.
+`container-image-registry-organization` \
+ | Organization portion of the container image tag \
+ of the built container image.
+`container-image-repository` | Repository portion of the container image tag \
+ of the built container image.
+`container-image-name` | Another way to reference the \
+ repository portion of the container image tag \
+ of the built container image.
+`container-image-version` | Version portion of the container image tag \
+ of the built container image.
+`container-image-tag` | Full container image tag of the built container, \
+ including the registry URI.
\
+ Takes the form of: \
+ `container-image-registry-organization/container-image-repository:container-image-version`
+`container-image-short-tag` | Short container image tag of the built container image, \
+ excluding the registry URI.
\
+ Takes the form of: \
+ `container-image-registry-uri/container-image-registry-organization/container-image-repository:container-image-version`
+
+""" # pylint: disable=line-too-long
+
import os
import sys
-from pathlib import Path
+from distutils import util
import sh
from ploigos_step_runner import StepImplementer, StepResult
from ploigos_step_runner.utils.containers import container_registries_login
DEFAULT_CONFIG = {
- # Path to the container registry authentication file to read and write to/from.
- 'containers-config-auth-file': os.path.join(Path.home(), '.buildah-auth.json'),
-
# Image specification file name
'imagespecfile': 'Containerfile',
@@ -58,11 +81,11 @@
}
REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [
- 'containers-config-auth-file',
'imagespecfile',
'context',
'tls-verify',
'format',
+ 'organization',
'service-name',
'application-name'
]
@@ -103,6 +126,29 @@ def _required_config_or_result_keys():
"""
return REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS
+ def _validate_required_config_or_previous_step_result_artifact_keys(self):
+ """Validates that the required configuration keys or previous step result artifacts
+ are set and have valid values.
+
+ Validates that:
+ * required configuration is given
+ * given 'imagespecfile' exists
+
+ Raises
+ ------
+ AssertionError
+ If step configuration or previous step result artifacts have invalid required values
+ """
+ super()._validate_required_config_or_previous_step_result_artifact_keys()
+
+ # if pom-file has value verify file exists
+ # If it doesn't have value and is required function will have already failed
+ image_spec_file = self.get_value('imagespecfile')
+ context = self.get_value('context')
+ image_spec_file_full_path = os.path.join(context, image_spec_file)
+ assert os.path.exists(image_spec_file_full_path), \
+ f'Given imagespecfile ({image_spec_file}) does not exist in given context ({context}).'
+
def _run_step(self):
"""Runs the step implemented by this StepImplementer.
@@ -113,38 +159,33 @@ def _run_step(self):
"""
step_result = StepResult.from_step_implementer(self)
- context = self.get_value('context')
+ # get config
image_spec_file = self.get_value('imagespecfile')
- image_spec_file_location = os.path.join(context, image_spec_file)
- application_name = self.get_value('application-name')
- service_name = self.get_value('service-name')
tls_verify = self.get_value('tls-verify')
+ if isinstance(tls_verify, str):
+ tls_verify = bool(util.strtobool(tls_verify))
- if not os.path.exists(image_spec_file_location):
- step_result.success = False
- step_result.message = 'Image specification file does not exist in location: ' \
- f'{image_spec_file_location}'
- return step_result
-
- image_tag_version = self.get_value('container-image-version')
- if image_tag_version is None:
- image_tag_version = 'latest'
+ # create local build tag
+ image_version = self.get_value('container-image-version')
+ if image_version is None:
+ image_version = 'latest'
print('No image tag version found in metadata. Using latest')
-
- destination = "localhost/{application_name}/{service_name}".format(
- application_name=application_name,
- service_name=service_name
- )
- tag = "{destination}:{version}".format(
- destination=destination,
- version=image_tag_version
- )
+ image_registry_uri = 'localhost'
+ image_registry_organization = self.get_value('organization')
+ image_repository = f"{self.get_value('application-name')}-{self.get_value('service-name')}"
+ short_tag = f"{image_registry_organization}/{image_repository}:{image_version}"
+ build_tag = f"{image_registry_uri}/{short_tag}"
try:
# login to any provider container registries
# NOTE: important to specify the auth file because depending on the context this is
# being run in python process may not have permissions to default location
containers_config_auth_file = self.get_value('containers-config-auth-file')
+ if not containers_config_auth_file:
+ containers_config_auth_file = os.path.join(
+ self.work_dir_path,
+ 'container-auth.json'
+ )
container_registries_login(
registries=self.get_value('container-registries'),
containers_config_auth_file=containers_config_auth_file,
@@ -156,48 +197,61 @@ def _run_step(self):
'--format=' + self.get_value('format'),
'--tls-verify=' + str(tls_verify).lower(),
'--layers', '-f', image_spec_file,
- '-t', tag,
+ '-t', build_tag,
'--authfile', containers_config_auth_file,
- context,
+ self.get_value('context'),
_out=sys.stdout,
_err=sys.stderr,
_tee='err'
)
- step_result.add_artifact(
- name='container-image-version',
- value=tag
- )
except sh.ErrorReturnCode as error: # pylint: disable=undefined-variable
step_result.success = False
step_result.message = 'Issue invoking buildah bud with given image ' \
f'specification file ({image_spec_file}): {error}'
return step_result
- image_tar_file = f'image-{application_name}-{service_name}-{image_tag_version}.tar'
- image_tar_path = os.path.join(self.work_dir_path, image_tar_file)
- try:
- # Check to see if the tar docker-archive file already exists
- # this needs to be run as buildah does not support overwritting
- # existing files.
- if os.path.exists(image_tar_path):
- os.remove(image_tar_path)
- sh.buildah.push( # pylint: disable=no-member
- tag,
- "docker-archive:" + image_tar_path,
- _out=sys.stdout,
- _err=sys.stderr,
- _tee='err'
- )
-
- step_result.add_artifact(
- name='image-tar-file',
- value=image_tar_path
- )
- except sh.ErrorReturnCode as error: # pylint: disable=undefined-variable
- step_result.success = False
- step_result.message = f'Issue invoking buildah push to tar file ' \
- f'({image_tar_path}): {error}'
- return step_result
+ # add artifacts
+ step_result.add_artifact(
+ name='container-image-registry-uri',
+ value=image_registry_uri,
+ description='Registry URI poriton of the container image tag' \
+ ' of the built container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-registry-organization',
+ value=image_registry_organization,
+ description='Organization portion of the container image tag' \
+ ' of the built container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-repository',
+ value=image_repository,
+ description='Repository portion of the container image tag' \
+ ' of the built container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-name',
+ value=image_repository,
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the built container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-version',
+ value=image_version,
+ description='Version portion of the container image tag of the built container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-tag',
+ value=build_tag,
+ description='Full container image tag of the built container,' \
+ ' including the registry URI.'
+ )
+ step_result.add_artifact(
+ name='container-image-short-tag',
+ value=short_tag,
+ description='Short container image tag of the built container image,' \
+ ' excluding the registry URI.'
+ )
return step_result
diff --git a/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py b/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py
index 512b86455..8b096decb 100644
--- a/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py
+++ b/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py
@@ -9,66 +9,97 @@
* runtime configuration
* previous step results
-Configuration Key | Required? | Default | Description
-------------------|-----------|----------|-----------
-`destination-url` | Yes | | Container image repository destination to push image \
- to. o not include the `docker://` prefix as it will \
- automatically be applied
-`src-tls-verify` | Yes | `'true'` | Whether to very TLS for source of image
-`dest-tls-verify` | Yes | `'true'` | Whether to verify TLS for destination of image
-`containers-config-auth-file` | Yes | `'~/.container-image-repo-auth.json'` | \
- Path to the container registry authentication file \
- to use for container registry authentication.
-`container-image-version` | Yes | | Tag to push container image with
-`image-tar-file` | Yes | | Local tar file of container image to push
+Configuration Key | Required? | Default | Description
+-------------------|-----------|---------|-----------
+`container-image-version` \
+ | Yes | | Version to use when pushing the container image
+`organization` | Yes | | Used in creating the container image push tag
+`application_name` | Yes | | Used in creating the container image push tag
+`service_name` | Yes | | Used in creating the container image push tag
+`destination-url` | Yes | | Container image repository destination to push image \
+ to.
\
+ Should not include the repository type.
+`[source-tls,verify, src-tls-verify]` \
+ | Yes | `True` | Whether to verify TLS when pulling source image.
+`dest-tls-verify` | Yes | `True` | Whether to verify TLS when pushing destination image.
+`[container-image-pull-tag, container-image-tag]` \
+ | Yes | | Container image tag of image to push to \
+ `destination-url`.
+`[container-image-pull-repository-type, container-image-repository-type]` \
+ | Yes | 'containers-storage:' \
+ | Container repository type for the pull image source. \
+ See https://github.com/containers/skopeo for valid \
+ options.
+`[container-image-push-repository-type, container-image-repository-type]` \
+ | Yes | 'docker://' \
+ | Container repository type for the push image source. \
+ See https://github.com/containers/skopeo for valid \
+ options.
+`containers-config-auth-file` \
+ | No | | Path to the container registry authentication file \
+ to use for container registry authentication. \
+ If one is not provided one will be created in the \
+ working directory.
+`container-registries` \
+ | False | | Hash of container registries to authenticate with.
+
Result Artifacts
----------------
Results artifacts output by this step.
-Result Artifact Key | Description
-----------------------------------------|------------
-`container-image-registry-uri` | URI to the image registry service.
-`container-image-registry-organization` | Organization in the image registry to push the image to.
-`container-image-repository` | Repository in the Organization in the image registry to \
- push the image to.
-`container-image-name` | Name of the image to push to the Image Repository. \
- This is the same value as `container-image-repository` as\
- these are always the same, but people refer to them \
- differently in different cases, so providing both.
-`container-image-version` | Version of the image to push.
-`container-image-tag` | Tag container image was pushed with.
\
- Takes the form of: \
- "`container-image-registry-uri`\
- /`container-image-registry-organization`\
- /`container-image-repository`\
- :`container-image-version`"
-"""
+Result Artifact Key | Description
+-------------------------------|------------
+`container-image-registry-uri` | Registry URI poriton of the container image tag \
+ of the pushed container image.
+`container-image-registry-organization` \
+ | Organization portion of the container image tag \
+ of the pushed container image.
+`container-image-repository` | Repository portion of the container image tag \
+ of the pushed container image.
+`container-image-name` | Another way to reference the \
+ repository portion of the container image tag \
+ of the pushed container image.
+`container-image-version` | Version portion of the container image tag \
+ of the pushed container image.
+`container-image-tag` | Full container image tag of the pushed container, \
+ including the registry URI.
\
+ Takes the form of: \
+ `container-image-registry-organization/container-image-repository:container-image-version`
+`container-image-short-tag` | Short container image tag of the pushed container image, \
+ excluding the registry URI.
\
+ Takes the form of: \
+ `container-image-registry-uri/container-image-registry-organization/container-image-repository:container-image-version`
+
+""" # pylint: disable=line-too-long
+
import os
import sys
from distutils import util
-from pathlib import Path
import sh
from ploigos_step_runner import StepImplementer, StepResult
from ploigos_step_runner.utils.containers import container_registries_login
DEFAULT_CONFIG = {
- 'src-tls-verify': 'true',
- 'dest-tls-verify': 'true',
- 'containers-config-auth-file': os.path.join(Path.home(), '.container-image-repo-auth.json')
+ 'src-tls-verify': True,
+ 'dest-tls-verify': True,
+ 'container-image-pull-repository-type': 'containers-storage:',
+ 'container-image-push-repository-type': 'docker://'
}
REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [
- 'containers-config-auth-file',
'destination-url',
- 'src-tls-verify',
+ ['source-tls,verify', 'src-tls-verify'],
'dest-tls-verify',
'service-name',
'application-name',
'organization',
- 'container-image-version',
- 'image-tar-file'
+
+ # being flexible for different use cases of proceeding steps
+ ['container-image-pull-tag', 'container-image-tag'],
+ ['container-image-pull-repository-type', 'container-image-repository-type'],
+ ['container-image-push-repository-type', 'container-image-repository-type']
]
class Skopeo(StepImplementer):
@@ -117,55 +148,108 @@ def _run_step(self):
"""
step_result = StepResult.from_step_implementer(self)
- image_version = self.get_value('container-image-version').lower()
- application_name = self.get_value('application-name')
- service_name = self.get_value('service-name')
- organization = self.get_value('organization')
- image_tar_file = self.get_value('image-tar-file')
- destination_url = self.get_value('destination-url')
+ # get config
+ image_pull_tag = self.get_value(['container-image-pull-tag', 'container-image-tag'])
+ pull_repository_type = self.get_value([
+ 'container-image-pull-repository-type',
+ 'container-image-repository-type'
+ ])
+ push_repository_type = self.get_value([
+ 'container-image-push-repository-type',
+ 'container-image-repository-type'
+ ])
dest_tls_verify = self.get_value('dest-tls-verify')
-
- image_registry_uri = destination_url
- image_registry_organization = organization
- image_repository = f"{application_name}-{service_name}"
- image_tag = f"{image_registry_uri}/{image_registry_organization}" \
- f"/{image_repository}:{image_version}"
+ if isinstance(dest_tls_verify, str):
+ dest_tls_verify = bool(util.strtobool(dest_tls_verify))
+ source_tls_verify = self.get_value(['source-tls-verify', 'src-tls-verify'])
+ if isinstance(source_tls_verify, str):
+ source_tls_verify = bool(util.strtobool(source_tls_verify))
+
+ # create push tag
+ image_version = self.get_value('container-image-version')
+ if image_version is None:
+ image_version = 'latest'
+ print('No image tag version found in metadata. Using latest')
+ image_version = image_version.lower()
+ image_registry_uri = self.get_value('destination-url')
+ image_registry_organization = self.get_value('organization')
+ image_repository = f"{self.get_value('application-name')}-{self.get_value('service-name')}"
+ image_push_short_tag = f"{image_registry_organization}/{image_repository}:{image_version}"
+ image_push_tag = f"{image_registry_uri}/{image_push_short_tag}"
try:
# login to any provider container registries
# NOTE: important to specify the auth file because depending on the context this is
# being run in python process may not have permissions to default location
containers_config_auth_file = self.get_value('containers-config-auth-file')
+ if not containers_config_auth_file:
+ containers_config_auth_file = os.path.join(
+ self.work_dir_path,
+ 'container-auth.json'
+ )
container_registries_login(
registries=self.get_value('container-registries'),
containers_config_auth_file=containers_config_auth_file,
- containers_config_tls_verify=util.strtobool(dest_tls_verify)
+ containers_config_tls_verify=dest_tls_verify
)
# push image
sh.skopeo.copy( # pylint: disable=no-member
- f"--src-tls-verify={str(self.get_value('src-tls-verify'))}",
- f"--dest-tls-verify={str(self.get_value('dest-tls-verify'))}",
+ f"--src-tls-verify={str(source_tls_verify).lower()}",
+ f"--dest-tls-verify={str(dest_tls_verify).lower()}",
f"--authfile={containers_config_auth_file}",
- f'docker-archive:{image_tar_file}',
- f'docker://{image_tag}',
+ f'{pull_repository_type}{image_pull_tag}',
+ f'{push_repository_type}{image_push_tag}',
_out=sys.stdout,
_err=sys.stderr,
_tee='err'
)
except sh.ErrorReturnCode as error:
step_result.success = False
- step_result.message = f'Error pushing container image ({image_tar_file}) ' \
- f' to tag ({image_tag}) using skopeo: {error}'
+ step_result.message = f'Error pushing container image ({image_pull_tag}) ' \
+ f' to tag ({image_push_tag}) using skopeo: {error}'
- step_result.add_artifact(name='container-image-registry-uri', value=image_registry_uri)
+ # add artifacts
+ step_result.add_artifact(
+ name='container-image-registry-uri',
+ value=image_registry_uri,
+ description='Registry URI poriton of the container image tag' \
+ ' of the pushed container image.'
+ )
step_result.add_artifact(
name='container-image-registry-organization',
- value=image_registry_organization
+ value=image_registry_organization,
+ description='Organization portion of the container image tag' \
+ ' of the pushed container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-repository',
+ value=image_repository,
+ description='Repository portion of the container image tag' \
+ ' of the pushed container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-name',
+ value=image_repository,
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the pushed container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-version',
+ value=image_version,
+ description='Version portion of the container image tag of the pushed container image.'
+ )
+ step_result.add_artifact(
+ name='container-image-tag',
+ value=image_push_tag,
+ description='Full container image tag of the pushed container,' \
+ ' including the registry URI.'
+ )
+ step_result.add_artifact(
+ name='container-image-short-tag',
+ value=image_push_short_tag,
+ description='Short container image tag of the pushed container image,' \
+ ' excluding the registry URI.'
)
- step_result.add_artifact(name='container-image-repository', value=image_repository)
- step_result.add_artifact(name='container-image-name', value=image_repository)
- step_result.add_artifact(name='container-image-version', value=image_version)
- step_result.add_artifact(name='container-image-tag', value=image_tag)
return step_result
diff --git a/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py b/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py
index 69e3d9ab6..654e56cbb 100644
--- a/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py
+++ b/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py
@@ -9,16 +9,16 @@
Configuration Key | Required? | Default | Description
-------------------------------|-----------|---------|-----------
-`image-tar-file` | Yes | | Path to container image tar file to scan
+`container-image-tag` | Yes | | Container image tag to scan.
`oscap-input-definitions-uri` | Yes | | URI to the OpenSCAP definitions file \
- to do the evaluation with. \
- Must use protocol file://|http://|https://. \
- Must have file extension .xml|.bz2.
+ to do the evaluation with. \
+ Must use protocol file://|http://|https://. \
+ Must have file extension .xml|.bz2.
`oscap-profile` | No | | OpenSCAP profile to evaluate.
`oscap-tailoring-uri` | No | | URI to OpenSCAP tailoring file \
- to do the evaluation with. \
- Must use protocol file://|http://|https://. \
- Must have file extension .xml|.bz2.
+ to do the evaluation with. \
+ Must use protocol file://|http://|https://. \
+ Must have file extension .xml|.bz2.
`oscap-fetch-remote-resources` | No | True | For Source DataStream and XCCDF files \
that have remote references fetch them if \
True, else don't. \
@@ -28,15 +28,12 @@
remote resources and this is not True. \
For disconnected environments the remote \
internal mirror.
-
-Expected Previous Step Results
-------------------------------
-
-Results expected from previous steps that this step requires.
-
-| Step Name | Result Key | Description
-|--------------------------|------------------|--------------
-| `create-container-image` | `image-tar-file` | Image to scan
+`[container-image-pull-repository-type, container-image-repository-type]` \
+ | Yes | 'containers-storage:' \
+ | \
+ Container repository type for the pull image source. \
+ See https://github.com/containers/skopeo for valid \
+ options.
Results
-------
@@ -52,23 +49,32 @@
import os
import re
-import sys
from distutils.util import strtobool
from io import StringIO
import sh
from ploigos_step_runner import StepResult, StepRunnerException
from ploigos_step_runner.step_implementer import StepImplementer
-from ploigos_step_runner.utils.file import download_and_decompress_source_to_destination
-from ploigos_step_runner.utils.io import create_sh_redirect_to_multiple_streams_fn_callback
+from ploigos_step_runner.utils.containers import (create_container_from_image,
+ mount_container)
+from ploigos_step_runner.utils.file import \
+ download_and_decompress_source_to_destination
+from ploigos_step_runner.utils.io import \
+ create_sh_redirect_to_multiple_streams_fn_callback
DEFAULT_CONFIG = {
- 'oscap-fetch-remote-resources': True
+ 'oscap-fetch-remote-resources': True,
+ 'container-image-pull-repository-type': 'containers-storage:',
+ 'container-image-repository-type': 'containers-storage:'
}
REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [
'oscap-input-definitions-uri',
- 'image-tar-file'
+ 'container-image-tag',
+ 'container-image-pull-repository-type',
+
+ # being flexible for different use cases of proceeding steps
+ ['container-image-pull-repository-type', 'container-image-repository-type']
]
@@ -226,23 +232,23 @@ def _run_step(self): # pylint: disable=too-many-locals,too-many-statements
"""
step_result = StepResult.from_step_implementer(self)
- image_tar_file = self.get_value('image-tar-file')
-
+ # get config
+ image_tag = self.get_value('container-image-tag')
oscap_profile = self.get_value('oscap-profile')
oscap_fetch_remote_resources = self.get_value('oscap-fetch-remote-resources')
-
- # create a container name from the tar file name, step name, and sub step name
- container_name = os.path.splitext(os.path.basename(image_tar_file))[0]
- container_name += f"-{self.step_name}-{self.sub_step_name}"
+ pull_repository_type = self.get_value([
+ 'container-image-pull-repository-type',
+ 'container-image-repository-type'
+ ])
try:
- # import image tar file to vfs file system
- print(f"\nImport image: {image_tar_file}")
- OpenSCAPGeneric.__buildah_import_image_from_tar(
- image_tar_file=image_tar_file,
- container_name=container_name
+ # create container from image that can be mounted
+ print(f"\nCreate container from image ({image_tag})")
+ container_name = create_container_from_image(
+ image_tag=image_tag,
+ repository_type=pull_repository_type
)
- print(f"Imported image: {image_tar_file}")
+ print(f"Created container ({container_name}) from image ({image_tag})")
# baking `buildah unshare` command to wrap other buildah commands with
# so that container does not need to be running in a privileged mode to be able
@@ -254,7 +260,7 @@ def _run_step(self): # pylint: disable=too-many-locals,too-many-statements
# NOTE: run in the context of `buildah unshare` so that container does not
# need to be run in a privileged mode
print(f"\nMount container: {container_name}")
- container_mount_path = OpenSCAPGeneric.__buildah_mount_container(
+ container_mount_path = mount_container(
buildah_unshare_command=buildah_unshare_command,
container_id=container_name
)
@@ -352,96 +358,12 @@ def _run_step(self): # pylint: disable=too-many-locals,too-many-statements
name='stdout-report',
value=oscap_out_file_path
)
- except StepRunnerException as error:
+ except (StepRunnerException, RuntimeError) as error:
step_result.success = False
step_result.message = str(error)
return step_result
- @staticmethod
- def __buildah_import_image_from_tar(image_tar_file, container_name):
- """Import a container image using buildah form a TAR file.
-
- Parameters
- ----------
- image_tar_file : str
- Path to TAR file to import as a container image.
- container_name : str
- name for the working container.
-
- Returns
- -------
- str
- Name of the imported container.
-
- Raises
- ------
- StepRunnerException
- If error importing image.
- """
- # import image tar file to vfs file system
- try:
- sh.buildah( # pylint: disable=no-member
- 'from',
- '--storage-driver', 'vfs',
- '--name', container_name,
- f"docker-archive:{image_tar_file}",
- _out=sys.stdout,
- _err=sys.stderr,
- _tee='err'
- )
- except sh.ErrorReturnCode as error:
- raise StepRunnerException(
- f'Error importing the image ({image_tar_file}): {error}'
- ) from error
-
- return container_name
-
- @staticmethod
- def __buildah_mount_container(buildah_unshare_command, container_id):
- """Use buildah to mount a container.
-
- Parameters
- ----------
- buildah_unshare_command : sh.buildah.unshare.bake()
- A baked sh.buildah.unshare command to use to run this command in the context off
- so that this can be done "rootless".
- container_id : str
- ID of the container to mount.
-
- Returns
- -------
- str
- Absolute path to the mounted container.
-
- Raises
- ------
- StepRunnerException
- If error mounting the container.
- """
- mount_path = None
- try:
- buildah_mount_out_buff = StringIO()
- buildah_mount_out_callback = create_sh_redirect_to_multiple_streams_fn_callback([
- sys.stdout,
- buildah_mount_out_buff
- ])
- buildah_mount_command = buildah_unshare_command.bake("buildah", "mount")
- buildah_mount_command(
- '--storage-driver', 'vfs',
- container_id,
- _out=buildah_mount_out_callback,
- _err=sys.stderr,
- _tee='err'
- )
- mount_path = buildah_mount_out_buff.getvalue().rstrip()
- except sh.ErrorReturnCode as error:
- raise StepRunnerException(
- f'Error mounting container ({container_id}): {error}'
- ) from error
-
- return mount_path
-
@staticmethod
def __get_oscap_document_type(oscap_input_file):
"""Gets the OpenSCAP document type for a given input file.
diff --git a/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py b/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py
index a584e46b0..186e70b9f 100644
--- a/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py
+++ b/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py
@@ -41,6 +41,7 @@
`container-image-signature-signer-private-key-fingerprint` \
| Fingerprint for the private key used to sign \
the container image.
+`container-image-signed-tag` | TODO
`container-image-signature-file-path` | File path to created image signature.
`container-image-signature-name` | Fully qualified name of the \
name of the image signature, \
@@ -75,7 +76,9 @@
# signer-pgp-private-key - new key name
# container-image-signer-pgp-private-key - old key name
['signer-pgp-private-key', 'container-image-signer-pgp-private-key'],
- 'container-image-tag'
+
+ # being flexible for different use cases of proceeding steps
+ ['container-image-push-tag', 'container-image-tag']
]
class PodmanSign(StepImplementer):
@@ -131,12 +134,11 @@ def _run_step(self):
)
# get the uri to the image to sign
- container_image_tag = self.get_value('container-image-tag')
+ container_image_tag = self.get_value(['container-image-push-tag', 'container-image-tag'])
image_signatures_directory = self.create_working_dir_sub_dir(
sub_dir_relative_path='image-signature'
)
-
try:
# import the PGP key and get the finger print
signer_pgp_private_key_fingerprint = import_pgp_key(
@@ -166,6 +168,10 @@ def _run_step(self):
image_signatures_directory=image_signatures_directory,
container_image_tag=container_image_tag
)
+ step_result.add_artifact(
+ name='container-image-signed-tag',
+ value=container_image_tag,
+ )
step_result.add_artifact(
name='container-image-signature-file-path',
value=signature_file_path,
diff --git a/src/ploigos_step_runner/utils/containers.py b/src/ploigos_step_runner/utils/containers.py
index 280039208..0aaf193df 100644
--- a/src/ploigos_step_runner/utils/containers.py
+++ b/src/ploigos_step_runner/utils/containers.py
@@ -2,9 +2,13 @@
"""
import sys
+from io import StringIO
import sh
from ploigos_step_runner.config.config_value import ConfigValue
+from ploigos_step_runner.utils.io import \
+ create_sh_redirect_to_multiple_streams_fn_callback
+
def container_registries_login( #pylint: disable=too-many-branches
registries,
@@ -275,3 +279,99 @@ def container_registry_login( #pylint: disable=too-many-arguments,too-many-branc
f"Failed to login to container registry ({container_registry_uri}) "
f"with username ({container_registry_username}): {error}"
) from error
+
+def create_container_from_image(
+ image_tag,
+ repository_type='container-storage:'
+):
+ """Import a container image using buildah form a TAR file.
+
+ Parameters
+ ----------
+ image_tag : str
+ Image tag to create a container from.
+ ex:
+ * localhost/my-app:latest
+ * quay.io/my-org/my-app:latest
+ * docker-archive:/local/path/to/my-app-container-image.tar
+ container_name : str
+ name for the working container.
+ repository_type : str
+ The type of repository to mount the given image tag from.
+ See https://github.com/containers/skopeo for details on different repository types.
+
+ Returns
+ -------
+ str
+ Name of the imported container.
+
+ Raises
+ ------
+ RuntimeError
+ If error importing image.
+ """
+ container_name = None
+ try:
+ buildah_from_out_buff = StringIO()
+ buildah_from_out_callback = create_sh_redirect_to_multiple_streams_fn_callback([
+ sys.stdout,
+ buildah_from_out_buff
+ ])
+ sh.buildah( # pylint: disable=no-member
+ 'from',
+ f"{repository_type}{image_tag}",
+ _out=buildah_from_out_callback,
+ _err=sys.stderr,
+ _tee='err'
+ )
+ container_name = buildah_from_out_buff.getvalue().rstrip()
+ except sh.ErrorReturnCode as error:
+ raise RuntimeError(
+ f'Error creating container from image ({image_tag}): {error}'
+ ) from error
+
+ return container_name
+
+
+def mount_container(buildah_unshare_command, container_id):
+ """Use buildah to mount a container.
+
+ Parameters
+ ----------
+ buildah_unshare_command : sh.buildah.unshare.bake()
+ A baked sh.buildah.unshare command to use to run this command in the context off
+ so that this can be done "rootless".
+ container_id : str
+ ID of the container to mount.
+
+ Returns
+ -------
+ str
+ Absolute path to the mounted container.
+
+ Raises
+ ------
+ RuntimeError
+ If error mounting the container.
+ """
+ mount_path = None
+ try:
+ buildah_mount_out_buff = StringIO()
+ buildah_mount_out_callback = create_sh_redirect_to_multiple_streams_fn_callback([
+ sys.stdout,
+ buildah_mount_out_buff
+ ])
+ buildah_mount_command = buildah_unshare_command.bake("buildah", "mount")
+ buildah_mount_command(
+ container_id,
+ _out=buildah_mount_out_callback,
+ _err=sys.stderr,
+ _tee='err'
+ )
+ mount_path = buildah_mount_out_buff.getvalue().rstrip()
+ except sh.ErrorReturnCode as error:
+ raise RuntimeError(
+ f'Error mounting container ({container_id}): {error}'
+ ) from error
+
+ return mount_path
diff --git a/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py b/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py
index 8ebc501c5..2fbab135b 100644
--- a/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py
+++ b/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py
@@ -1,14 +1,15 @@
import os
import re
-from testfixtures import TempDirectory
-from tests.step_implementers.shared.test_openscap_generic import \
- TestStepImplementerSharedOpenSCAPGeneric
+import tests.step_implementers.shared.test_openscap_generic
from ploigos_step_runner.step_implementers.container_image_static_compliance_scan import \
OpenSCAP
+from testfixtures import TempDirectory
+from tests.helpers.base_step_implementer_test_case import \
+ BaseStepImplementerTestCase
-class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP(TestStepImplementerSharedOpenSCAPGeneric):
+class BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP(BaseStepImplementerTestCase):
def create_step_implementer(
self,
step_config={},
@@ -26,123 +27,52 @@ def create_step_implementer(
parent_work_dir_path=parent_work_dir_path
)
- def test__validate_required_config_or_previous_step_result_artifact_keys_valid(self):
- step_config = {
- 'oscap-input-definitions-uri': 'https://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml',
- 'oscap-profile': 'foo',
- 'image-tar-file': 'does-not-matter'
- }
-
- with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- step_implementer = self.create_step_implementer(
- step_config=step_config,
- step_name='test',
- implementer='OpenSCAP',
- parent_work_dir_path=parent_work_dir_path
- )
-
- step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
-
- def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_extension(self):
- oscap_input_definitions_uri = 'https://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml.foo'
- step_config = {
- 'oscap-input-definitions-uri': oscap_input_definitions_uri,
- 'oscap-profile': 'foo',
- 'image-tar-file': 'does-not-matter'
- }
-
- with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- step_implementer = self.create_step_implementer(
- step_config=step_config,
- step_name='test',
- implementer='OpenSCAP',
- parent_work_dir_path=parent_work_dir_path
- )
-
- with self.assertRaisesRegex(
- AssertionError,
- re.compile(
- r'Open SCAP input definitions source '
- rf'\({oscap_input_definitions_uri}\) must be of known type \(xml\|bz2\), got: \.foo'
- )
- ):
- step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
-
- def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_protocal(self):
- oscap_input_definitions_uri = 'foo://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml'
- step_config = {
- 'oscap-input-definitions-uri': oscap_input_definitions_uri,
- 'oscap-profile': 'foo',
- 'image-tar-file': 'does-not-matter'
- }
-
- with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- step_implementer = self.create_step_implementer(
- step_config=step_config,
- step_name='test',
- implementer='OpenSCAP',
- parent_work_dir_path=parent_work_dir_path
- )
-
- with self.assertRaisesRegex(
- AssertionError,
- re.compile(
- r'Open SCAP input definitions source '
- rf'\({oscap_input_definitions_uri}\) must start with known protocol '
- r'\(file://\|http://\|https://\)\.'
- )
- ):
- step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
-
- def test__validate_required_config_or_previous_step_result_artifact_keys_missing_oscap_profile(self):
- oscap_input_definitions_uri = 'https://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml'
- step_config = {
- 'oscap-input-definitions-uri': oscap_input_definitions_uri,
- 'image-tar-file': 'does-not-matter'
- }
-
- with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- step_implementer = self.create_step_implementer(
- step_config=step_config,
- step_name='test',
- implementer='OpenSCAP',
- parent_work_dir_path=parent_work_dir_path
- )
-
- with self.assertRaisesRegex(
- AssertionError,
- re.compile(
- r"Missing required step configuration or previous step result"
- r" artifact keys: \['oscap-profile'\]"
- )
- ):
- step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
-
- def test__validate_required_config_or_previous_step_result_artifact_keys_missing_required_keys(self):
- step_config = {}
- with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- step_implementer = self.create_step_implementer(
- step_config=step_config,
- step_name='test',
- implementer='OpenSCAP',
- parent_work_dir_path=parent_work_dir_path
- )
-
- with self.assertRaisesRegex(
- AssertionError,
- re.compile(
- r"Missing required step configuration or previous step result"
- r" artifact keys: \['oscap-profile', 'oscap-input-definitions-uri', 'image-tar-file'\]"
- )
- ):
- step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
+class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP_step_implementer_config_defaults(
+ BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric_step_implementer_config_defaults
+):
+ pass
+
+class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP__required_config_or_result_keys(
+ BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP
+):
+ def test_result(self):
+ required_keys = OpenSCAP._required_config_or_result_keys()
+ expected_required_keys = [
+ 'oscap-profile',
+ 'oscap-input-definitions-uri',
+ 'container-image-tag',
+ 'container-image-pull-repository-type',
+ ['container-image-pull-repository-type', 'container-image-repository-type']
+ ]
+ self.assertEqual(required_keys, expected_required_keys)
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__validate_required_config_or_previous_step_result_artifact_keys(
+ BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__validate_required_config_or_previous_step_result_artifact_keys
+):
+ pass
+
+class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP___get_oscap_document_type(
+ BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_document_type
+):
+ pass
+
+class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP___get_oscap_eval_type_based_on_document_type(
+ BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_eval_type_based_on_document_type
+):
+ pass
+
+class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP___run_oscap_scan(
+ BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___run_oscap_scan
+):
+ pass
+
+class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP__run_step(
+ BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__run_step
+):
+ pass
diff --git a/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py b/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py
index 0efeae61f..31da9cec4 100644
--- a/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py
+++ b/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py
@@ -1,10 +1,11 @@
-from tests.step_implementers.shared.test_openscap_generic import \
- TestStepImplementerSharedOpenSCAPGeneric
+import tests.step_implementers.shared.test_openscap_generic
from ploigos_step_runner.step_implementers.container_image_static_vulnerability_scan import \
OpenSCAP
+from tests.helpers.base_step_implementer_test_case import \
+ BaseStepImplementerTestCase
-class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP(TestStepImplementerSharedOpenSCAPGeneric):
+class BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP(BaseStepImplementerTestCase):
def create_step_implementer(
self,
step_config={},
@@ -21,3 +22,45 @@ def create_step_implementer(
workflow_result=workflow_result,
parent_work_dir_path=parent_work_dir_path
)
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP_step_implementer_config_defaults(
+ BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric_step_implementer_config_defaults
+):
+ pass
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__required_config_or_result_keys(
+ BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__required_config_or_result_keys
+):
+ pass
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__validate_required_config_or_previous_step_result_artifact_keys(
+ BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__validate_required_config_or_previous_step_result_artifact_keys
+):
+ pass
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP___get_oscap_document_type(
+ BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_document_type
+):
+ pass
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP___get_oscap_eval_type_based_on_document_type(
+ BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_eval_type_based_on_document_type
+):
+ pass
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP___run_oscap_scan(
+ BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___run_oscap_scan
+):
+ pass
+
+class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__run_step(
+ BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP,
+ tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__run_step
+):
+ pass
diff --git a/tests/step_implementers/create_container_image/test_buildah_create_container_image.py b/tests/step_implementers/create_container_image/test_buildah_create_container_image.py
index 9d028ef02..61caff3a6 100644
--- a/tests/step_implementers/create_container_image/test_buildah_create_container_image.py
+++ b/tests/step_implementers/create_container_image/test_buildah_create_container_image.py
@@ -13,7 +13,7 @@
BaseStepImplementerTestCase
-class TestStepImplementerCreateContainerImageBuildah(BaseStepImplementerTestCase):
+class BaseTestStepImplementerCreateContainerImageBuildah(BaseStepImplementerTestCase):
def create_step_implementer(
self,
step_config={},
@@ -31,11 +31,114 @@ def create_step_implementer(
parent_work_dir_path=parent_work_dir_path
)
-# TESTS FOR configuration checks
- def test_step_implementer_config_defaults(self):
+@patch("ploigos_step_runner.StepImplementer._validate_required_config_or_previous_step_result_artifact_keys")
+class TestStepImplementerCreateContainerImageBuildah__validate_required_config_or_previous_step_result_artifact_keys(
+ BaseTestStepImplementerCreateContainerImageBuildah
+):
+ def test_valid_defaults(self, mock_super_validate):
+ with TempDirectory() as test_dir:
+ # setup
+ parent_work_dir_path = os.path.join(test_dir.path, 'working')
+ step_config = {
+ 'context': test_dir.path,
+ 'organization': 'org-name',
+ 'service-name': 'service-name',
+ 'application-name': 'app-name'
+ }
+ test_dir.write('Containerfile',b'''testing''')
+
+ # run test
+ step_implementer = self.create_step_implementer(
+ step_config=step_config,
+ parent_work_dir_path=parent_work_dir_path,
+ )
+ step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
+
+ # validate
+ mock_super_validate.assert_called_once_with()
+
+ def test_valid_custom_imagespecfile(self, mock_super_validate):
+ with TempDirectory() as test_dir:
+ # setup
+ parent_work_dir_path = os.path.join(test_dir.path, 'working')
+ step_config = {
+ 'context': test_dir.path,
+ 'organization': 'org-name',
+ 'service-name': 'service-name',
+ 'application-name': 'app-name',
+ 'imagespecfile': 'MockContainerfile.ubi8'
+ }
+ test_dir.write('MockContainerfile.ubi8',b'''testing''')
+
+ # run test
+ step_implementer = self.create_step_implementer(
+ step_config=step_config,
+ parent_work_dir_path=parent_work_dir_path,
+ )
+ step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
+
+ # validate
+ mock_super_validate.assert_called_once_with()
+
+ def test_fail_missing_imagespecfile_defaults(self, mock_super_validate):
+ with TempDirectory() as test_dir:
+ # setup
+ parent_work_dir_path = os.path.join(test_dir.path, 'working')
+ step_config = {
+ 'context': test_dir.path,
+ 'organization': 'org-name',
+ 'service-name': 'service-name',
+ 'application-name': 'app-name'
+ }
+
+ # run test
+ step_implementer = self.create_step_implementer(
+ step_config=step_config,
+ parent_work_dir_path=parent_work_dir_path,
+ )
+ with self.assertRaisesRegex(
+ AssertionError,
+ rf'Given imagespecfile \(Containerfile\) does not exist'
+ rf' in given context \({test_dir.path}\).'
+ ):
+ step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
+
+ # validate
+ mock_super_validate.assert_called_once_with()
+
+ def test_fail_missing_imagespecfile_custom_imagespecfile(self, mock_super_validate):
+ with TempDirectory() as test_dir:
+ # setup
+ parent_work_dir_path = os.path.join(test_dir.path, 'working')
+ step_config = {
+ 'context': test_dir.path,
+ 'organization': 'org-name',
+ 'service-name': 'service-name',
+ 'application-name': 'app-name',
+ 'imagespecfile': 'MockContainerfile.ubi8'
+ }
+
+ # run test
+ step_implementer = self.create_step_implementer(
+ step_config=step_config,
+ parent_work_dir_path=parent_work_dir_path,
+ )
+ with self.assertRaisesRegex(
+ AssertionError,
+ rf'Given imagespecfile \(MockContainerfile.ubi8\) does not exist'
+ rf' in given context \({test_dir.path}\).'
+ ):
+ step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
+
+ # validate
+ mock_super_validate.assert_called_once_with()
+
+class TestStepImplementerCreateContainerImageBuildah_step_implementer_config_defaults(
+ BaseTestStepImplementerCreateContainerImageBuildah
+):
+ def test_result(self):
defaults = Buildah.step_implementer_config_defaults()
expected_defaults = {
- 'containers-config-auth-file': os.path.join(Path.home(), '.buildah-auth.json'),
'imagespecfile': 'Containerfile',
'context': '.',
'tls-verify': True,
@@ -43,22 +146,29 @@ def test_step_implementer_config_defaults(self):
}
self.assertEqual(defaults, expected_defaults)
- def test__required_config_or_result_keys(self):
+class TestStepImplementerCreateContainerImageBuildah___required_config_or_result_keys(
+ BaseTestStepImplementerCreateContainerImageBuildah
+):
+ def test_result(self):
required_keys = Buildah._required_config_or_result_keys()
expected_required_keys = [
- 'containers-config-auth-file',
'imagespecfile',
'context',
'tls-verify',
'format',
+ 'organization',
'service-name',
'application-name'
]
self.assertEqual(required_keys, expected_required_keys)
+class TestStepImplementerCreateContainerImageBuildah___run_step(
+ BaseTestStepImplementerCreateContainerImageBuildah
+):
@patch('sh.buildah', create=True)
- def test__run_step_pass(self, buildah_mock):
+ def test_pass(self, buildah_mock):
with TempDirectory() as temp_dir:
+ # setup test
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
temp_dir.write('Containerfile',b'''testing''')
@@ -68,11 +178,11 @@ def test__run_step_pass(self, buildah_mock):
workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
step_config = {
- 'containers-config-auth-file': 'buildah-auth.json',
'imagespecfile': 'Containerfile',
'context': temp_dir.path,
'tls-verify': True,
'format': 'oci',
+ 'organization': 'org-name',
'service-name': 'service-name',
'application-name': 'app-name'
}
@@ -84,109 +194,168 @@ def test__run_step_pass(self, buildah_mock):
parent_work_dir_path=parent_work_dir_path
)
-
-
+ # run test
result = step_implementer._run_step()
+ # verify results
expected_step_result = StepResult(
step_name='create-container-image',
sub_step_name='Buildah',
sub_step_implementer_name='Buildah'
)
+ expected_step_result.add_artifact(
+ name='container-image-registry-uri',
+ value='localhost',
+ description='Registry URI poriton of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-registry-organization',
+ value='org-name',
+ description='Organization portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-repository',
+ value='app-name-service-name',
+ description='Repository portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-name',
+ value='app-name-service-name',
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the built container image.'
+ )
expected_step_result.add_artifact(
name='container-image-version',
- value='localhost/app-name/service-name:1.0-123abc'
+ value='1.0-123abc',
+ description='Version portion of the container image tag of the built container image.'
)
expected_step_result.add_artifact(
- name='image-tar-file',
- value=f'{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar'
+ name='container-image-tag',
+ value='localhost/org-name/app-name-service-name:1.0-123abc',
+ description='Full container image tag of the built container,' \
+ ' including the registry URI.'
)
-
+ expected_step_result.add_artifact(
+ name='container-image-short-tag',
+ value='org-name/app-name-service-name:1.0-123abc',
+ description='Short container image tag of the built container image,' \
+ ' excluding the registry URI.'
+ )
+ self.assertEqual(result, expected_step_result)
buildah_mock.bud.assert_called_once_with(
'--format=oci',
'--tls-verify=true',
'--layers', '-f', 'Containerfile',
- '-t', 'localhost/app-name/service-name:1.0-123abc',
- '--authfile', 'buildah-auth.json',
+ '-t', 'localhost/org-name/app-name-service-name:1.0-123abc',
+ '--authfile', os.path.join(step_implementer.work_dir_path, 'container-auth.json'),
temp_dir.path,
_out=sys.stdout,
_err=sys.stderr,
_tee='err'
)
- buildah_mock.push.assert_called_once_with(
- 'localhost/app-name/service-name:1.0-123abc',
- f'docker-archive:{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar',
- _out=sys.stdout,
- _err=sys.stderr,
- _tee='err'
- )
- self.assertEqual(result, expected_step_result)
-
@patch('sh.buildah', create=True)
- def test__run_step_pass_no_container_image_version(self, buildah_mock):
+ def test_pass_custom_auth_file(self, buildah_mock):
with TempDirectory() as temp_dir:
+ # setup test
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
temp_dir.write('Containerfile',b'''testing''')
+ artifact_config = {
+ 'container-image-version': {'description': '', 'value': '1.0-123abc'},
+ }
+ workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
+
step_config = {
- 'containers-config-auth-file': 'buildah-auth.json',
'imagespecfile': 'Containerfile',
'context': temp_dir.path,
'tls-verify': True,
'format': 'oci',
+ 'organization': 'org-name',
'service-name': 'service-name',
- 'application-name': 'app-name'
+ 'application-name': 'app-name',
+ 'containers-config-auth-file': 'mock-auth.json'
}
-
step_implementer = self.create_step_implementer(
step_config=step_config,
step_name='create-container-image',
implementer='Buildah',
- parent_work_dir_path=parent_work_dir_path,
+ workflow_result=workflow_result,
+ parent_work_dir_path=parent_work_dir_path
)
+ # run test
result = step_implementer._run_step()
+ # verify results
expected_step_result = StepResult(
step_name='create-container-image',
sub_step_name='Buildah',
sub_step_implementer_name='Buildah'
)
+ expected_step_result.add_artifact(
+ name='container-image-registry-uri',
+ value='localhost',
+ description='Registry URI poriton of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-registry-organization',
+ value='org-name',
+ description='Organization portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-repository',
+ value='app-name-service-name',
+ description='Repository portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-name',
+ value='app-name-service-name',
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the built container image.'
+ )
expected_step_result.add_artifact(
name='container-image-version',
- value='localhost/app-name/service-name:latest'
+ value='1.0-123abc',
+ description='Version portion of the container image tag of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-tag',
+ value='localhost/org-name/app-name-service-name:1.0-123abc',
+ description='Full container image tag of the built container,' \
+ ' including the registry URI.'
)
expected_step_result.add_artifact(
- name='image-tar-file',
- value=f'{step_implementer.work_dir_path}/image-app-name-service-name-latest.tar'
+ name='container-image-short-tag',
+ value='org-name/app-name-service-name:1.0-123abc',
+ description='Short container image tag of the built container image,' \
+ ' excluding the registry URI.'
)
+ self.assertEqual(result, expected_step_result)
buildah_mock.bud.assert_called_once_with(
'--format=oci',
'--tls-verify=true',
'--layers', '-f', 'Containerfile',
- '-t', 'localhost/app-name/service-name:latest',
- '--authfile', 'buildah-auth.json',
+ '-t', 'localhost/org-name/app-name-service-name:1.0-123abc',
+ '--authfile', 'mock-auth.json',
temp_dir.path,
_out=sys.stdout,
_err=sys.stderr,
_tee='err'
)
- buildah_mock.push.assert_called_once_with(
- 'localhost/app-name/service-name:latest',
- f'docker-archive:{step_implementer.work_dir_path}/image-app-name-service-name-latest.tar',
- _out=sys.stdout,
- _err=sys.stderr,
- _tee='err'
- )
- self.assertEqual(result, expected_step_result)
-
@patch('sh.buildah', create=True)
- def test__run_step_pass_image_tar_file_exists(self, buildah_mock):
+ def test_pass_string_tls_verify(self, buildah_mock):
with TempDirectory() as temp_dir:
+ # setup test
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
temp_dir.write('Containerfile',b'''testing''')
@@ -196,11 +365,11 @@ def test__run_step_pass_image_tar_file_exists(self, buildah_mock):
workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
step_config = {
- 'containers-config-auth-file': 'buildah-auth.json',
'imagespecfile': 'Containerfile',
'context': temp_dir.path,
- 'tls-verify': True,
+ 'tls-verify': 'true',
'format': 'oci',
+ 'organization': 'org-name',
'service-name': 'service-name',
'application-name': 'app-name'
}
@@ -212,57 +381,82 @@ def test__run_step_pass_image_tar_file_exists(self, buildah_mock):
parent_work_dir_path=parent_work_dir_path
)
- step_implementer.write_working_file('image-app-name-service-name-1.0-123abc.tar')
-
+ # run test
result = step_implementer._run_step()
+ # verify results
expected_step_result = StepResult(
step_name='create-container-image',
sub_step_name='Buildah',
sub_step_implementer_name='Buildah'
)
+ expected_step_result.add_artifact(
+ name='container-image-registry-uri',
+ value='localhost',
+ description='Registry URI poriton of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-registry-organization',
+ value='org-name',
+ description='Organization portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-repository',
+ value='app-name-service-name',
+ description='Repository portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-name',
+ value='app-name-service-name',
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the built container image.'
+ )
expected_step_result.add_artifact(
name='container-image-version',
- value='localhost/app-name/service-name:1.0-123abc'
+ value='1.0-123abc',
+ description='Version portion of the container image tag of the built container image.'
)
expected_step_result.add_artifact(
- name='image-tar-file',
- value=f'{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar'
+ name='container-image-tag',
+ value='localhost/org-name/app-name-service-name:1.0-123abc',
+ description='Full container image tag of the built container,' \
+ ' including the registry URI.'
)
-
+ expected_step_result.add_artifact(
+ name='container-image-short-tag',
+ value='org-name/app-name-service-name:1.0-123abc',
+ description='Short container image tag of the built container image,' \
+ ' excluding the registry URI.'
+ )
+ self.assertEqual(result, expected_step_result)
buildah_mock.bud.assert_called_once_with(
'--format=oci',
'--tls-verify=true',
'--layers', '-f', 'Containerfile',
- '-t', 'localhost/app-name/service-name:1.0-123abc',
- '--authfile', 'buildah-auth.json',
+ '-t', 'localhost/org-name/app-name-service-name:1.0-123abc',
+ '--authfile', os.path.join(step_implementer.work_dir_path, 'container-auth.json'),
temp_dir.path,
_out=sys.stdout,
_err=sys.stderr,
_tee='err'
)
- buildah_mock.push.assert_called_once_with(
- 'localhost/app-name/service-name:1.0-123abc',
- f'docker-archive:{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar',
- _out=sys.stdout,
- _err=sys.stderr,
- _tee='err'
- )
- self.assertEqual(result, expected_step_result)
-
@patch('sh.buildah', create=True)
- def test__run_step_fail_no_image_spec_file(self, buildah_mock):
+ def test_pass_no_container_image_version(self, buildah_mock):
with TempDirectory() as temp_dir:
+ # setup test
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- artifact_config = {
- 'container-image-version': {'description': '', 'value': '1.0-123abc'},
- }
- workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
-
+ temp_dir.write('Containerfile',b'''testing''')
step_config = {
+ 'imagespecfile': 'Containerfile',
+ 'context': temp_dir.path,
+ 'tls-verify': True,
+ 'format': 'oci',
+ 'organization': 'org-name',
'service-name': 'service-name',
'application-name': 'app-name'
}
@@ -270,24 +464,75 @@ def test__run_step_fail_no_image_spec_file(self, buildah_mock):
step_config=step_config,
step_name='create-container-image',
implementer='Buildah',
- workflow_result=workflow_result,
- parent_work_dir_path=parent_work_dir_path
+ parent_work_dir_path=parent_work_dir_path,
)
+ # run test
result = step_implementer._run_step()
+ # verify results
expected_step_result = StepResult(
step_name='create-container-image',
sub_step_name='Buildah',
sub_step_implementer_name='Buildah'
)
- expected_step_result.success = False
- expected_step_result.message = 'Image specification file does not exist in location: ./Containerfile'
-
+ expected_step_result.add_artifact(
+ name='container-image-registry-uri',
+ value='localhost',
+ description='Registry URI poriton of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-registry-organization',
+ value='org-name',
+ description='Organization portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-repository',
+ value='app-name-service-name',
+ description='Repository portion of the container image tag' \
+ ' of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-name',
+ value='app-name-service-name',
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-version',
+ value='latest',
+ description='Version portion of the container image tag of the built container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-tag',
+ value='localhost/org-name/app-name-service-name:latest',
+ description='Full container image tag of the built container,' \
+ ' including the registry URI.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-short-tag',
+ value='org-name/app-name-service-name:latest',
+ description='Short container image tag of the built container image,' \
+ ' excluding the registry URI.'
+ )
self.assertEqual(result, expected_step_result)
+ buildah_mock.bud.assert_called_once_with(
+ '--format=oci',
+ '--tls-verify=true',
+ '--layers', '-f', 'Containerfile',
+ '-t', 'localhost/org-name/app-name-service-name:latest',
+ '--authfile', os.path.join(step_implementer.work_dir_path, 'container-auth.json'),
+ temp_dir.path,
+ _out=sys.stdout,
+ _err=sys.stderr,
+ _tee='err'
+ )
+
@patch('sh.buildah', create=True)
- def test__run_step_fail_buildah_bud_error(self, buildah_mock):
+ def test_fail_buildah_bud_error(self, buildah_mock):
with TempDirectory() as temp_dir:
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
temp_dir.write('Containerfile',b'''testing''')
@@ -299,7 +544,6 @@ def test__run_step_fail_buildah_bud_error(self, buildah_mock):
image_spec_file = 'Containerfile'
step_config = {
- 'containers-config-auth-file': 'buildah-auth.json',
'imagespecfile': image_spec_file,
'context': temp_dir.path,
'tls-verify': True,
@@ -333,57 +577,3 @@ def test__run_step_fail_buildah_bud_error(self, buildah_mock):
re.DOTALL
)
)
-
- @patch('sh.buildah', create=True)
- def test__run_step_fail_buildah_push_error(self, buildah_mock):
- with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
- temp_dir.write('Containerfile',b'''testing''')
-
- application_name = 'app-name'
- service_name = 'service-name'
- image_tag_version = '1.0-123abc'
-
- artifact_config = {
- 'container-image-version': {'description': '', 'value': image_tag_version},
- }
- workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
-
- step_config = {
- 'containers-config-auth-file': 'buildah-auth.json',
- 'imagespecfile': 'Containerfile',
- 'context': temp_dir.path,
- 'tl-sverify': 'true',
- 'format': 'oci',
- 'service-name': service_name,
- 'application-name': application_name
- }
- step_implementer = self.create_step_implementer(
- step_config=step_config,
- step_name='create-container-image',
- implementer='Buildah',
- workflow_result=workflow_result,
- parent_work_dir_path=parent_work_dir_path
- )
-
- buildah_mock.push.side_effect = sh.ErrorReturnCode('buildah', b'mock out', b'mock error')
-
- result = step_implementer._run_step()
-
- image_tar_path = os.path.join(
- step_implementer.work_dir_path,
- f'image-{application_name}-{service_name}-{image_tag_version}.tar'
- )
- self.assertFalse(result.success)
- self.assertRegex(
- result.message,
- re.compile(
- rf'Issue invoking buildah push to tar file \({image_tar_path}\):'
- r'.*RAN: buildah'
- r'.*STDOUT:'
- r'.*mock out'
- r'.*STDERR:'
- r'.*mock error',
- re.DOTALL
- )
- )
\ No newline at end of file
diff --git a/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py b/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py
index d497e07fc..169ddf6d9 100644
--- a/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py
+++ b/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py
@@ -1,22 +1,17 @@
-# pylint: disable=missing-module-docstring
-# pylint: disable=missing-class-docstring
-# pylint: disable=missing-function-docstring
import os
-import re
from io import IOBase
-from pathlib import Path
from unittest.mock import patch
import sh
+from ploigos_step_runner import StepResult
+from ploigos_step_runner.step_implementers.push_container_image import Skopeo
from testfixtures import TempDirectory
from tests.helpers.base_step_implementer_test_case import \
BaseStepImplementerTestCase
from tests.helpers.test_utils import Any
-from ploigos_step_runner.step_implementers.push_container_image import Skopeo
-from ploigos_step_runner import StepResult
-class TestStepImplementerSkopeoSourceBase(BaseStepImplementerTestCase):
+class BaseTestStepImplementerSkopeoSourceBase(BaseStepImplementerTestCase):
def create_step_implementer(
self,
step_config={},
@@ -34,122 +29,320 @@ def create_step_implementer(
parent_work_dir_path=parent_work_dir_path
)
- def test_step_implementer_config_defaults(self):
+class TestStepImplementerSkopeoSourceBase_step_implementer_config_defaults(
+ BaseTestStepImplementerSkopeoSourceBase
+):
+ def test_result(self):
defaults = Skopeo.step_implementer_config_defaults()
expected_defaults = {
- 'containers-config-auth-file': os.path.join(
- Path.home(),
- '.container-image-repo-auth.json'
- ),
- 'dest-tls-verify': 'true',
- 'src-tls-verify': 'true'
+ 'src-tls-verify': True,
+ 'dest-tls-verify': True,
+ 'container-image-pull-repository-type': 'containers-storage:',
+ 'container-image-push-repository-type': 'docker://'
}
self.assertEqual(defaults, expected_defaults)
- def test__required_config_or_result_keys(self):
+class TestStepImplementerSkopeoSourceBase___required_config_or_result_keys(
+ BaseTestStepImplementerSkopeoSourceBase
+):
+ def test_result(self):
required_keys = Skopeo._required_config_or_result_keys()
expected_required_keys = [
- 'containers-config-auth-file',
'destination-url',
- 'src-tls-verify',
+ ['source-tls,verify', 'src-tls-verify'],
'dest-tls-verify',
'service-name',
'application-name',
'organization',
- 'container-image-version',
- 'image-tar-file'
+ ['container-image-pull-tag', 'container-image-tag'],
+ ['container-image-pull-repository-type', 'container-image-repository-type'],
+ ['container-image-push-repository-type', 'container-image-repository-type']
]
self.assertEqual(required_keys, expected_required_keys)
+class TestStepImplementerSkopeoSourceBase__run_step(
+ BaseTestStepImplementerSkopeoSourceBase
+):
+ def __run_pass_test(
+ self,
+ step_config,
+ image_version,
+ image_pull_tag,
+ image_push_tag,
+ temp_dir,
+ skopeo_mock,
+ skopeo_mock_call_dest_tls_value='true',
+ skopeo_mock_call_src_tls_value='true',
+ expected_step_result_message=None,
+ containers_config_auth_file=None
+ ):
+ parent_work_dir_path = os.path.join(temp_dir.path, 'working')
+
+ # setup step
+ step_implementer = self.create_step_implementer(
+ step_config=step_config,
+ step_name='push-container-image',
+ implementer='Skopeo',
+ parent_work_dir_path=parent_work_dir_path,
+ )
+
+ # run step
+ result = step_implementer._run_step()
+
+ # verify
+ expected_step_result = StepResult(
+ step_name='push-container-image',
+ sub_step_name='Skopeo',
+ sub_step_implementer_name='Skopeo'
+ )
+ if expected_step_result_message:
+ expected_step_result.message = expected_step_result_message
+ expected_step_result.add_artifact(
+ name='container-image-registry-uri',
+ value='fake-registry.xyz',
+ description='Registry URI poriton of the container image tag' \
+ ' of the pushed container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-registry-organization',
+ value='fake-org',
+ description='Organization portion of the container image tag' \
+ ' of the pushed container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-repository',
+ value='fake-app-fake-service',
+ description='Repository portion of the container image tag' \
+ ' of the pushed container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-name',
+ value='fake-app-fake-service',
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the pushed container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-version',
+ value=image_version,
+ description='Version portion of the container image tag of the pushed container image.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-tag',
+ value=f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}',
+ description='Full container image tag of the pushed container,' \
+ ' including the registry URI.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-short-tag',
+ value=f'fake-org/fake-app-fake-service:{image_version}',
+ description='Short container image tag of the pushed container image,' \
+ ' excluding the registry URI.'
+ )
+ self.assertEqual(result, expected_step_result)
+
+ if not containers_config_auth_file:
+ containers_config_auth_file = os.path.join(
+ step_implementer.work_dir_path,
+ 'container-auth.json'
+ )
+
+ skopeo_mock.copy.assert_called_once_with(
+ f"--src-tls-verify={skopeo_mock_call_src_tls_value}",
+ f"--dest-tls-verify={skopeo_mock_call_dest_tls_value}",
+ f"--authfile={containers_config_auth_file}",
+ f'containers-storage:{image_pull_tag}',
+ f'docker://{image_push_tag}',
+ _out=Any(IOBase),
+ _err=Any(IOBase),
+ _tee='err'
+ )
+
@patch.object(sh, 'skopeo', create=True)
- def test_run_step_pass(self, skopeo_mock):
+ def test_pass(self, skopeo_mock):
with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- image_tar_file = 'fake-image.tar'
image_version = '1.0-69442c8'
- image_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
step_config = {
'destination-url': 'fake-registry.xyz',
'service-name': 'fake-service',
'application-name': 'fake-app',
'organization': 'fake-org',
'container-image-version': image_version,
- 'image-tar-file': image_tar_file
+ 'container-image-pull-tag': image_pull_tag
}
- step_implementer = self.create_step_implementer(
+ self.__run_pass_test(
+ temp_dir=temp_dir,
+ image_version=image_version,
+ image_pull_tag=image_pull_tag,
+ image_push_tag=image_push_tag,
step_config=step_config,
- step_name='push-container-image',
- implementer='Skopeo',
- parent_work_dir_path=parent_work_dir_path,
+ skopeo_mock=skopeo_mock
)
- result = step_implementer._run_step()
-
- expected_step_result = StepResult(
- step_name='push-container-image',
- sub_step_name='Skopeo',
- sub_step_implementer_name='Skopeo'
- )
- expected_step_result.add_artifact(
- name='container-image-registry-uri',
- value='fake-registry.xyz'
- )
- expected_step_result.add_artifact(
- name='container-image-registry-organization',
- value='fake-org'
- )
- expected_step_result.add_artifact(
- name='container-image-repository',
- value='fake-app-fake-service'
- )
- expected_step_result.add_artifact(
- name='container-image-name',
- value='fake-app-fake-service'
+ @patch.object(sh, 'skopeo', create=True)
+ def test_pass_default_version(self, skopeo_mock):
+ with TempDirectory() as temp_dir:
+ image_version = 'latest'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ step_config = {
+ 'destination-url': 'fake-registry.xyz',
+ 'service-name': 'fake-service',
+ 'application-name': 'fake-app',
+ 'organization': 'fake-org',
+ 'container-image-pull-tag': image_pull_tag
+ }
+ self.__run_pass_test(
+ temp_dir=temp_dir,
+ image_version=image_version,
+ image_pull_tag=image_pull_tag,
+ image_push_tag=image_push_tag,
+ step_config=step_config,
+ skopeo_mock=skopeo_mock
)
- expected_step_result.add_artifact(
- name='container-image-version',
- value=image_version
+
+
+ @patch.object(sh, 'skopeo', create=True)
+ def test_pass_string_destination_tls_truethy(self, skopeo_mock):
+ with TempDirectory() as temp_dir:
+ image_version = '1.0-69442c8'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ step_config = {
+ 'destination-url': 'fake-registry.xyz',
+ 'service-name': 'fake-service',
+ 'application-name': 'fake-app',
+ 'organization': 'fake-org',
+ 'container-image-version': image_version,
+ 'container-image-pull-tag': image_pull_tag,
+ 'dest-tls-verify': 'true'
+ }
+ self.__run_pass_test(
+ temp_dir=temp_dir,
+ image_version=image_version,
+ image_pull_tag=image_pull_tag,
+ image_push_tag=image_push_tag,
+ step_config=step_config,
+ skopeo_mock=skopeo_mock
)
- expected_step_result.add_artifact(
- name='container-image-tag',
- value='fake-registry.xyz/fake-org/fake-app-fake-service:1.0-69442c8'
+
+ @patch.object(sh, 'skopeo', create=True)
+ def test_pass_string_destination_tls_falsy(self, skopeo_mock):
+ with TempDirectory() as temp_dir:
+ image_version = '1.0-69442c8'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ step_config = {
+ 'destination-url': 'fake-registry.xyz',
+ 'service-name': 'fake-service',
+ 'application-name': 'fake-app',
+ 'organization': 'fake-org',
+ 'container-image-version': image_version,
+ 'container-image-pull-tag': image_pull_tag,
+ 'dest-tls-verify': 'false'
+ }
+ self.__run_pass_test(
+ temp_dir=temp_dir,
+ image_version=image_version,
+ image_pull_tag=image_pull_tag,
+ image_push_tag=image_push_tag,
+ step_config=step_config,
+ skopeo_mock=skopeo_mock,
+ skopeo_mock_call_dest_tls_value='false'
)
- self.assertEqual(
- result.get_step_result_dict(),
- expected_step_result.get_step_result_dict()
+
+ @patch.object(sh, 'skopeo', create=True)
+ def test_pass_string_source_tls_truthy(self, skopeo_mock):
+ with TempDirectory() as temp_dir:
+ image_version = '1.0-69442c8'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ step_config = {
+ 'destination-url': 'fake-registry.xyz',
+ 'service-name': 'fake-service',
+ 'application-name': 'fake-app',
+ 'organization': 'fake-org',
+ 'container-image-version': image_version,
+ 'container-image-pull-tag': image_pull_tag,
+ 'src-tls-verify': 'true'
+ }
+ self.__run_pass_test(
+ temp_dir=temp_dir,
+ image_version=image_version,
+ image_pull_tag=image_pull_tag,
+ image_push_tag=image_push_tag,
+ step_config=step_config,
+ skopeo_mock=skopeo_mock
)
- containers_config_auth_file = os.path.join(
- Path.home(),
- '.container-image-repo-auth.json'
+ @patch.object(sh, 'skopeo', create=True)
+ def test_pass_string_source_tls_falsy(self, skopeo_mock):
+ with TempDirectory() as temp_dir:
+ image_version = '1.0-69442c8'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ step_config = {
+ 'destination-url': 'fake-registry.xyz',
+ 'service-name': 'fake-service',
+ 'application-name': 'fake-app',
+ 'organization': 'fake-org',
+ 'container-image-version': image_version,
+ 'container-image-pull-tag': image_pull_tag,
+ 'src-tls-verify': 'false'
+ }
+ self.__run_pass_test(
+ temp_dir=temp_dir,
+ image_version=image_version,
+ image_pull_tag=image_pull_tag,
+ image_push_tag=image_push_tag,
+ step_config=step_config,
+ skopeo_mock=skopeo_mock,
+ skopeo_mock_call_src_tls_value='false'
)
- skopeo_mock.copy.assert_called_once_with(
- "--src-tls-verify=true",
- "--dest-tls-verify=true",
- f"--authfile={containers_config_auth_file}",
- f'docker-archive:{image_tar_file}',
- f'docker://{image_tag}',
- _out=Any(IOBase),
- _err=Any(IOBase),
- _tee='err'
+
+ @patch.object(sh, 'skopeo', create=True)
+ def test_pass_custom_auth_file(self, skopeo_mock):
+ with TempDirectory() as temp_dir:
+ image_version = '1.0-69442c8'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ step_config = {
+ 'destination-url': 'fake-registry.xyz',
+ 'service-name': 'fake-service',
+ 'application-name': 'fake-app',
+ 'organization': 'fake-org',
+ 'container-image-version': image_version,
+ 'container-image-pull-tag': image_pull_tag,
+ 'containers-config-auth-file': 'mock-auth.json'
+ }
+ self.__run_pass_test(
+ temp_dir=temp_dir,
+ image_version=image_version,
+ image_pull_tag=image_pull_tag,
+ image_push_tag=image_push_tag,
+ step_config=step_config,
+ skopeo_mock=skopeo_mock,
+ containers_config_auth_file='mock-auth.json'
)
@patch.object(sh, 'skopeo', create=True)
- def test_run_step_fail_run_skopeo(self, skopeo_mock):
+ def test_fail_run_skopeo(self, skopeo_mock):
with TempDirectory() as temp_dir:
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
- image_tar_file = 'fake-image.tar'
+ # setup step
image_version = '1.0-69442c8'
- image_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
+ image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}'
+ image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}'
step_config = {
'destination-url': 'fake-registry.xyz',
'service-name': 'fake-service',
'application-name': 'fake-app',
'organization': 'fake-org',
'container-image-version': image_version,
- 'image-tar-file': image_tar_file
+ 'container-image-version': image_version,
+ 'container-image-pull-tag': image_pull_tag
}
step_implementer = self.create_step_implementer(
step_config=step_config,
@@ -158,9 +351,11 @@ def test_run_step_fail_run_skopeo(self, skopeo_mock):
parent_work_dir_path=parent_work_dir_path,
)
+ # run step (mock fail)
skopeo_mock.copy.side_effect = sh.ErrorReturnCode('skopeo', b'mock stdout', b'mock error')
result = step_implementer._run_step()
+ # verify
expected_step_result = StepResult(
step_name='push-container-image',
sub_step_name='Skopeo',
@@ -168,31 +363,48 @@ def test_run_step_fail_run_skopeo(self, skopeo_mock):
)
expected_step_result.add_artifact(
name='container-image-registry-uri',
- value='fake-registry.xyz'
+ value='fake-registry.xyz',
+ description='Registry URI poriton of the container image tag' \
+ ' of the pushed container image.'
)
expected_step_result.add_artifact(
name='container-image-registry-organization',
- value='fake-org'
+ value='fake-org',
+ description='Organization portion of the container image tag' \
+ ' of the pushed container image.'
)
expected_step_result.add_artifact(
name='container-image-repository',
- value='fake-app-fake-service'
+ value='fake-app-fake-service',
+ description='Repository portion of the container image tag' \
+ ' of the pushed container image.'
)
expected_step_result.add_artifact(
name='container-image-name',
- value='fake-app-fake-service'
+ value='fake-app-fake-service',
+ description='Another way to reference the' \
+ ' repository portion of the container image tag of the pushed container image.'
)
expected_step_result.add_artifact(
name='container-image-version',
- value=image_version
+ value=image_version,
+ description='Version portion of the container image tag of the pushed container image.'
)
expected_step_result.add_artifact(
name='container-image-tag',
- value='fake-registry.xyz/fake-org/fake-app-fake-service:1.0-69442c8'
+ value='fake-registry.xyz/fake-org/fake-app-fake-service:1.0-69442c8',
+ description='Full container image tag of the pushed container,' \
+ ' including the registry URI.'
+ )
+ expected_step_result.add_artifact(
+ name='container-image-short-tag',
+ value='fake-org/fake-app-fake-service:1.0-69442c8',
+ description='Short container image tag of the pushed container image,' \
+ ' excluding the registry URI.'
)
expected_step_result.success = False
- expected_step_result.message = f"Error pushing container image ({image_tar_file}) " +\
- f" to tag ({image_tag}) using skopeo: \n" +\
+ expected_step_result.message = f"Error pushing container image ({image_pull_tag}) " +\
+ f" to tag ({image_push_tag}) using skopeo: \n" +\
f"\n" +\
f" RAN: skopeo\n" +\
f"\n" +\
@@ -201,19 +413,18 @@ def test_run_step_fail_run_skopeo(self, skopeo_mock):
f"\n" +\
f" STDERR:\n" +\
f"mock error"
-
self.assertEqual(result, expected_step_result)
containers_config_auth_file = os.path.join(
- Path.home(),
- '.container-image-repo-auth.json'
+ step_implementer.work_dir_path,
+ 'container-auth.json'
)
skopeo_mock.copy.assert_called_once_with(
"--src-tls-verify=true",
"--dest-tls-verify=true",
f"--authfile={containers_config_auth_file}",
- f'docker-archive:{image_tar_file}',
- f'docker://{image_tag}',
+ f'containers-storage:{image_pull_tag}',
+ f'docker://{image_push_tag}',
_out=Any(IOBase),
_err=Any(IOBase),
_tee='err'
diff --git a/tests/step_implementers/shared/test_openscap_generic.py b/tests/step_implementers/shared/test_openscap_generic.py
index c78b880f6..063a4396f 100644
--- a/tests/step_implementers/shared/test_openscap_generic.py
+++ b/tests/step_implementers/shared/test_openscap_generic.py
@@ -1,6 +1,3 @@
-# pylint: disable=missing-module-docstring
-# pylint: disable=missing-class-docstring
-# pylint: disable=missing-function-docstring
import os
import re
from contextlib import redirect_stdout
@@ -17,7 +14,7 @@
from ploigos_step_runner.step_implementers.shared.openscap_generic import OpenSCAPGeneric
-class TestStepImplementerSharedOpenSCAPGeneric(BaseStepImplementerTestCase):
+class BaseTestStepImplementerSharedOpenSCAPGeneric(BaseStepImplementerTestCase):
def create_step_implementer(
self,
step_config={},
@@ -35,22 +32,37 @@ def create_step_implementer(
parent_work_dir_path=parent_work_dir_path
)
- def test_step_implementer_config_defaults(self):
+class TestStepImplementerSharedOpenSCAPGeneric_step_implementer_config_defaults(
+ BaseTestStepImplementerSharedOpenSCAPGeneric
+):
+
+ def test_result(self):
defaults = OpenSCAPGeneric.step_implementer_config_defaults()
expected_defaults = {
- 'oscap-fetch-remote-resources': True
+ 'oscap-fetch-remote-resources': True,
+ 'container-image-pull-repository-type': 'containers-storage:',
+ 'container-image-repository-type': 'containers-storage:'
}
self.assertEqual(defaults, expected_defaults)
- def test__required_config_or_result_keys(self):
+class TestStepImplementerSharedOpenSCAPGeneric__required_config_or_result_keys(
+ BaseTestStepImplementerSharedOpenSCAPGeneric
+):
+ def test_result(self):
required_keys = OpenSCAPGeneric._required_config_or_result_keys()
expected_required_keys = [
'oscap-input-definitions-uri',
- 'image-tar-file'
+ 'container-image-tag',
+ 'container-image-pull-repository-type',
+ ['container-image-pull-repository-type', 'container-image-repository-type']
]
self.assertEqual(required_keys, expected_required_keys)
- def test__validate_required_config_or_previous_step_result_artifact_keys_valid(self):
+@patch("ploigos_step_runner.StepImplementer._validate_required_config_or_previous_step_result_artifact_keys")
+class TestStepImplementerSharedOpenSCAPGeneric__validate_required_config_or_previous_step_result_artifact_keys(
+ BaseTestStepImplementerSharedOpenSCAPGeneric
+):
+ def test__valid(self, mock_super_validate):
step_config = {
'oscap-input-definitions-uri': 'https://www.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.bz2',
'image-tar-file': 'does-not-matter'
@@ -68,7 +80,9 @@ def test__validate_required_config_or_previous_step_result_artifact_keys_valid(s
step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
- def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_protocal(self):
+ mock_super_validate.assert_called_once_with()
+
+ def test_invalid_protocal(self, mock_super_validate):
oscap_input_definitions_uri = 'foo://www.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.bz2'
step_config = {
'oscap-input-definitions-uri': oscap_input_definitions_uri,
@@ -94,7 +108,9 @@ def test__validate_required_config_or_previous_step_result_artifact_keys_invalid
):
step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
- def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_extension(self):
+ mock_super_validate.assert_called_once_with()
+
+ def test_invalid_extension(self, mock_super_validate):
oscap_input_definitions_uri = 'https://www.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.foo'
step_config = {
'oscap-input-definitions-uri': oscap_input_definitions_uri,
@@ -120,144 +136,13 @@ def test__validate_required_config_or_previous_step_result_artifact_keys_invalid
):
step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
- def test__validate_required_config_or_previous_step_result_artifact_keys_missing_required_keys(self):
- step_config = {}
- with TempDirectory() as temp_dir:
- parent_work_dir_path = os.path.join(temp_dir.path, 'working')
-
- step_implementer = self.create_step_implementer(
- step_config=step_config,
- step_name='test',
- implementer='OpenSCAP',
- parent_work_dir_path=parent_work_dir_path
- )
-
- with self.assertRaisesRegex(
- AssertionError,
- re.compile(
- r"Missing required step configuration or previous step result"
- r" artifact keys: \['oscap-input-definitions-uri', 'image-tar-file'\]"
- )
- ):
- step_implementer._validate_required_config_or_previous_step_result_artifact_keys()
-
- @patch('sh.buildah', create=True)
- def test___buildah_import_image_from_tar_success(self, buildah_mock):
- image_tar_file = "/does/not/matter.tar"
- container_name = "test"
-
- OpenSCAPGeneric._OpenSCAPGeneric__buildah_import_image_from_tar(
- image_tar_file=image_tar_file,
- container_name=container_name
- )
-
- buildah_mock.assert_called_once_with(
- 'from',
- '--storage-driver', 'vfs',
- '--name', container_name,
- f"docker-archive:{image_tar_file}",
- _out=Any(IOBase),
- _err=Any(IOBase),
- _tee='err'
- )
-
- @patch('sh.buildah', create=True)
- def test___buildah_import_image_from_tar_error(self, buildah_mock):
- image_tar_file = "/does/not/matter.tar"
- container_name = "test"
-
- buildah_mock.side_effect = sh.ErrorReturnCode('buildah', b'mock out', b'mock error')
-
- with self.assertRaisesRegex(
- StepRunnerException,
- re.compile(
- rf"Error importing the image \({image_tar_file}\):"
- r".*RAN: buildah"
- r".*STDOUT:"
- r".*mock out"
- r".*STDERR:"
- r".*mock error",
- re.DOTALL
- )
- ):
- OpenSCAPGeneric._OpenSCAPGeneric__buildah_import_image_from_tar(
- image_tar_file=image_tar_file,
- container_name=container_name
- )
-
- buildah_mock.assert_called_once_with(
- 'from',
- '--storage-driver', 'vfs',
- '--name', container_name,
- f"docker-archive:{image_tar_file}",
- _out=Any(IOBase),
- _err=Any(IOBase),
- _tee='err'
- )
-
- @patch('sh.buildah', create=True)
- def test___buildah_mount_container_success(self, buildah_mock):
- buildah_unshare_command = sh.buildah.bake('unshare')
- container_name = "test"
-
- expected_mount_path = '/this/is/a/path'
- buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = create_sh_side_effect(
- mock_stdout=f"{expected_mount_path}",
- )
-
- container_mount_path = OpenSCAPGeneric._OpenSCAPGeneric__buildah_mount_container(
- buildah_unshare_command=buildah_unshare_command,
- container_id=container_name
- )
-
- self.assertEqual(container_mount_path, expected_mount_path)
-
- buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with(
- '--storage-driver', 'vfs',
- container_name,
- _out=Any(IOBase),
- _err=Any(IOBase),
- _tee='err'
- )
-
- @patch('sh.buildah', create=True)
- def test___buildah_mount_container_error(self, buildah_mock):
- buildah_unshare_command = sh.buildah.bake('unshare')
- container_name = "test"
-
- buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = sh.ErrorReturnCode(
- 'buildah mount',
- b'mock out',
- b'mock error'
- )
-
- with self.assertRaisesRegex(
- StepRunnerException,
- re.compile(
- rf"Error mounting container \({container_name}\):"
- r".*RAN: buildah"
- r".*STDOUT:"
- r".*mock out"
- r".*STDERR:"
- r".*mock error",
- re.DOTALL
- )
- ):
- OpenSCAPGeneric._OpenSCAPGeneric__buildah_mount_container(
- buildah_unshare_command=buildah_unshare_command,
- container_id=container_name
- )
-
- buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with(
- '--storage-driver', 'vfs',
- container_name,
- _out=Any(IOBase),
- _err=Any(IOBase),
- _tee='err'
- )
+ mock_super_validate.assert_called_once_with()
+class TestStepImplementerSharedOpenSCAPGeneric___get_oscap_document_type(
+ BaseTestStepImplementerSharedOpenSCAPGeneric
+):
@patch('sh.oscap', create=True)
- def test___get_oscap_document_type_sds(self, oscap_mock):
+ def test_sds(self, oscap_mock):
oscap_input_file = '/does/not/matter.xml'
sh.oscap.info.side_effect = create_sh_side_effect(
@@ -293,7 +178,7 @@ def test___get_oscap_document_type_sds(self, oscap_mock):
self.assertEqual(oscap_document_type, 'Source Data Stream')
@patch('sh.oscap', create=True)
- def test___get_oscap_document_type_oval(self, oscap_mock):
+ def test_oval(self, oscap_mock):
oscap_input_file = '/does/not/matter.xml'
sh.oscap.info.side_effect = create_sh_side_effect(
@@ -315,7 +200,7 @@ def test___get_oscap_document_type_oval(self, oscap_mock):
self.assertEqual(oscap_document_type, 'OVAL Definitions')
@patch('sh.oscap', create=True)
- def test___get_oscap_document_type_xccdf(self, oscap_mock):
+ def test_xccdf(self, oscap_mock):
oscap_input_file = '/does/not/matter.xml'
sh.oscap.info.side_effect = create_sh_side_effect(
@@ -356,7 +241,7 @@ def test___get_oscap_document_type_xccdf(self, oscap_mock):
self.assertEqual(oscap_document_type, 'XCCDF Checklist')
@patch('sh.oscap', create=True)
- def test___get_oscap_document_type_error(self, oscap_mock):
+ def test_error(self, oscap_mock):
oscap_input_file = '/does/not/matter.xml'
sh.oscap.info.side_effect = sh.ErrorReturnCode(
@@ -387,7 +272,10 @@ def test___get_oscap_document_type_error(self, oscap_mock):
_out=Any(IOBase)
)
- def test___get_oscap_eval_type_based_on_document_type_sds(self):
+class TestStepImplementerSharedOpenSCAPGeneric___get_oscap_eval_type_based_on_document_type(
+ BaseTestStepImplementerSharedOpenSCAPGeneric
+):
+ def test_sds(self):
oscap_document_type = 'Source Data Stream'
oscap_eval_type = \
@@ -397,7 +285,7 @@ def test___get_oscap_eval_type_based_on_document_type_sds(self):
self.assertEqual(oscap_eval_type, 'xccdf')
- def test___get_oscap_eval_type_based_on_document_type_xccdf(self):
+ def test_xccdf(self):
oscap_document_type = 'XCCDF Checklist'
oscap_eval_type = \
@@ -407,7 +295,7 @@ def test___get_oscap_eval_type_based_on_document_type_xccdf(self):
self.assertEqual(oscap_eval_type, 'xccdf')
- def test___get_oscap_eval_type_based_on_document_type_oval(self):
+ def test_oval(self):
oscap_document_type = 'OVAL Definitions'
oscap_eval_type = \
@@ -417,7 +305,7 @@ def test___get_oscap_eval_type_based_on_document_type_oval(self):
self.assertEqual(oscap_eval_type, 'oval')
- def test___get_oscap_eval_type_based_on_document_type_unknown(self):
+ def test_unknown(self):
oscap_document_type = 'What is this nonsense?'
oscap_eval_type = \
@@ -427,7 +315,10 @@ def test___get_oscap_eval_type_based_on_document_type_unknown(self):
self.assertEqual(oscap_eval_type, None)
- def __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
+class TestStepImplementerSharedOpenSCAPGeneric___run_oscap_scan(
+ BaseTestStepImplementerSharedOpenSCAPGeneric
+):
+ def __run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
self,
buildah_mock,
oscap_eval_type,
@@ -528,10 +419,8 @@ def __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
self.assertEqual(stdout, oscap_stdout_expected)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_xccdf_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_xccdf_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='xccdf',
oscap_profile='this.is.real.i.sware',
@@ -560,10 +449,8 @@ def test___run_oscap_scan_xccdf_do_no_fetch_remote_with_profile_all_pass(self, b
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_all_pass(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_xccdf_do_yes_fetch_remote_with_profile_all_pass(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='xccdf',
oscap_profile='this.is.real.i.sware',
@@ -592,10 +479,8 @@ def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_all_pass(self,
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_xccdf_do_yes_fetch_remote_no_profile_all_pass(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_xccdf_do_yes_fetch_remote_no_profile_all_pass(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='xccdf',
oscap_profile=None,
@@ -624,10 +509,8 @@ def test___run_oscap_scan_xccdf_do_yes_fetch_remote_no_profile_all_pass(self, bu
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_with_fail(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_xccdf_do_yes_fetch_remote_with_profile_with_fail(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='xccdf',
oscap_profile='this.is.real.i.sware',
@@ -674,10 +557,8 @@ def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_with_fail(self,
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_str_oscap_fetch_remote_resources_flag(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_str_oscap_fetch_remote_resources_flag(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='xccdf',
oscap_profile='this.is.real.i.sware',
@@ -706,10 +587,8 @@ def test___run_oscap_scan_str_oscap_fetch_remote_resources_flag(self, buildah_mo
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='oval',
oscap_fetch_remote_resources=False,
@@ -727,10 +606,8 @@ def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, bu
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='oval',
oscap_fetch_remote_resources=False,
@@ -752,7 +629,7 @@ def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, bu
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_xccdf_exit_code_1(self, buildah_mock):
+ def test_xccdf_exit_code_1(self, buildah_mock):
with self.assertRaisesRegex(
StepRunnerException,
re.compile(
@@ -776,9 +653,7 @@ def test___run_oscap_scan_xccdf_exit_code_1(self, buildah_mock):
re.DOTALL
)
):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='xccdf',
oscap_profile='this.is.real.i.sware',
@@ -819,7 +694,7 @@ def test___run_oscap_scan_xccdf_exit_code_1(self, buildah_mock):
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_oval_exit_code_1(self, buildah_mock):
+ def test_oval_exit_code_1(self, buildah_mock):
with self.assertRaisesRegex(
StepRunnerException,
re.compile(
@@ -835,9 +710,7 @@ def test___run_oscap_scan_oval_exit_code_1(self, buildah_mock):
re.DOTALL
)
):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='oval',
oscap_fetch_remote_resources=False,
@@ -857,7 +730,7 @@ def test___run_oscap_scan_oval_exit_code_1(self, buildah_mock):
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_oval_exit_code_2(self, buildah_mock):
+ def test_oval_exit_code_2(self, buildah_mock):
with self.assertRaisesRegex(
StepRunnerException,
re.compile(
@@ -873,9 +746,7 @@ def test___run_oscap_scan_oval_exit_code_2(self, buildah_mock):
re.DOTALL
)
):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='oval',
oscap_fetch_remote_resources=False,
@@ -895,7 +766,7 @@ def test___run_oscap_scan_oval_exit_code_2(self, buildah_mock):
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_oval_exit_code_unknown(self, buildah_mock):
+ def test_oval_exit_code_unknown(self, buildah_mock):
with self.assertRaisesRegex(
StepRunnerException,
re.compile(
@@ -911,9 +782,7 @@ def test___run_oscap_scan_oval_exit_code_unknown(self, buildah_mock):
re.DOTALL
)
):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='oval',
oscap_fetch_remote_resources=False,
@@ -933,10 +802,8 @@ def test___run_oscap_scan_oval_exit_code_unknown(self, buildah_mock):
)
@patch('sh.buildah', create=True)
- def test___run_oscap_scan_xccdf_with_tailoring_file(self, buildah_mock):
- TestStepImplementerSharedOpenSCAPGeneric. \
- __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass(
- self,
+ def test_xccdf_with_tailoring_file(self, buildah_mock):
+ self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass(
buildah_mock=buildah_mock,
oscap_eval_type='xccdf',
oscap_profile='this.is.real.i.sware',
@@ -965,57 +832,62 @@ def test___run_oscap_scan_xccdf_with_tailoring_file(self, buildah_mock):
"""
)
+class TestStepImplementerSharedOpenSCAPGeneric__run_step(
+ BaseTestStepImplementerSharedOpenSCAPGeneric
+):
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan')
- @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container')
+ @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container')
+ @patch(
+ 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image',
+ return_value='mock-image-working-container-mock-1'
+ )
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type')
@patch('sh.buildah', create=True)
- def test_run_step_pass(
+ def test_pass(
self,
buildah_mock,
get_oscap_document_type_mock,
- buildah_mount_container_mock,
+ create_container_from_image_mock,
+ mount_container_mock,
run_oscap_scan_mock
):
oscap_document_type = 'Source Data Stream'
oscap_eval_type = 'xccdf'
oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2'
+ container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock'
step_config = {
'oscap-input-definitions-uri': oscap_input_definitions_uri,
- 'oscap-profile': 'foo'
+ 'oscap-profile': 'foo',
+ 'container-image-tag': container_image_tag
}
- image_tar_file_name = 'my_awesome_app'
- image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar'
oscap_eval_success = True
oscap_eval_fails = None
with TempDirectory() as temp_dir:
+ # setup test
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
mount_path = '/does/not/matter/container-mount'
- artifact_config = {
- 'image-tar-file': {'description': '', 'value': image_tar_file}
- }
- workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
-
step_implementer = self.create_step_implementer(
step_config=step_config,
step_name='test',
implementer='OpenSCAP',
- workflow_result=workflow_result,
parent_work_dir_path=parent_work_dir_path
)
get_oscap_document_type_mock.return_value = oscap_document_type
- buildah_mount_container_mock.return_value = mount_path
+ mount_container_mock.return_value = mount_path
run_oscap_scan_mock.return_value = [
oscap_eval_success,
oscap_eval_fails
]
+ # run test
stdout_buff = StringIO()
with redirect_stdout(stdout_buff):
step_result = step_implementer._run_step()
+ # verify results
expected_results = StepResult(
step_name='test',
sub_step_name='OpenSCAP',
@@ -1037,14 +909,14 @@ def test_run_step_pass(
self.assertEqual(expected_results, step_result)
stdout = stdout_buff.getvalue()
-
+ expected_container_name = 'mock-image-working-container-mock-1'
self.assertRegex(
stdout,
re.compile(
- rf".*Import image: {image_tar_file}"
- rf".*Imported image: {image_tar_file}"
- rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*"
- rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'"
+ rf".*Create container from image \({container_image_tag}\)"
+ rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)"
+ rf".*Mount container: {expected_container_name}"
+ rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'"
rf".*Download input definitions: {oscap_input_definitions_uri}"
rf".*Downloaded input definitions to: /.+/working/test/rhel\-8.ds.xml"
rf".*Determine OpenSCAP document type of input file: /.+/working/test/rhel\-8\.ds\.xml"
@@ -1058,25 +930,30 @@ def test_run_step_pass(
)
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan')
- @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container')
+ @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container')
+ @patch(
+ 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image',
+ return_value='mock-image-working-container-mock-1'
+ )
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type')
@patch('sh.buildah', create=True)
- def test_run_step_fail(
+ def test_fail(
self,
buildah_mock,
get_oscap_document_type_mock,
- buildah_mount_container_mock,
+ create_container_from_image_mock,
+ mount_container_mock,
run_oscap_scan_mock
):
oscap_document_type = 'Source Data Stream'
oscap_eval_type = 'xccdf'
oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2'
+ container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock'
step_config = {
'oscap-input-definitions-uri': oscap_input_definitions_uri,
- 'oscap-profile': 'foo'
+ 'oscap-profile': 'foo',
+ 'container-image-tag': container_image_tag
}
- image_tar_file_name = 'my_awesome_app'
- image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar'
oscap_eval_success = False
oscap_eval_fails = """
Title Install dnf-automatic Package
@@ -1089,25 +966,20 @@ def test_run_step_fail(
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
mount_path = '/does/not/matter/container-mount'
- artifact_config = {
- 'image-tar-file': {'description': '', 'value': image_tar_file}
- }
- workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
-
step_implementer = self.create_step_implementer(
step_config=step_config,
step_name='test',
implementer='OpenSCAP',
- workflow_result=workflow_result,
parent_work_dir_path=parent_work_dir_path
)
get_oscap_document_type_mock.return_value = oscap_document_type
- buildah_mount_container_mock.return_value = mount_path
+ mount_container_mock.return_value = mount_path
run_oscap_scan_mock.return_value = [
oscap_eval_success,
oscap_eval_fails
]
+
stdout_buff = StringIO()
with redirect_stdout(stdout_buff):
step_result = step_implementer._run_step()
@@ -1137,14 +1009,14 @@ def test_run_step_fail(
self.assertEqual(expected_results, step_result)
stdout = stdout_buff.getvalue()
-
+ expected_container_name = 'mock-image-working-container-mock-1'
self.assertRegex(
stdout,
re.compile(
- rf".*Import image: {image_tar_file}"
- rf".*Imported image: {image_tar_file}"
- rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*"
- rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'"
+ rf".*Create container from image \({container_image_tag}\)"
+ rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)"
+ rf".*Mount container: {expected_container_name}"
+ rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'"
rf".*Download input definitions: {oscap_input_definitions_uri}"
rf".*Downloaded input definitions to: /.+/working/test/rhel\-8.ds.xml"
rf".*Determine OpenSCAP document type of input file: /.+/working/test/rhel\-8\.ds\.xml"
@@ -1158,27 +1030,32 @@ def test_run_step_fail(
)
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan')
- @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container')
+ @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container')
+ @patch(
+ 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image',
+ return_value='mock-image-working-container-mock-1'
+ )
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type')
@patch('sh.buildah', create=True)
- def test_run_step_pass_with_tailoring_file(
+ def test_pass_with_tailoring_file(
self,
buildah_mock,
get_oscap_document_type_mock,
- buildah_mount_container_mock,
+ create_container_from_image_mock,
+ mount_container_mock,
run_oscap_scan_mock
):
oscap_document_type = 'Source Data Stream'
oscap_eval_type = 'xccdf'
oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2'
+ container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock'
oscap_tailoring_uri = 'https://raw.githubusercontent.com/ploigos/ploigos-example-oscap-content/main/xccdf_com.redhat.ploigos_profile_example_ubi8-tailoring-xccdf.xml'
step_config = {
'oscap-input-definitions-uri': oscap_input_definitions_uri,
'oscap-tailoring-uri': oscap_tailoring_uri,
- 'oscap-profile': 'foo'
+ 'oscap-profile': 'foo',
+ 'container-image-tag': container_image_tag
}
- image_tar_file_name = 'my_awesome_app'
- image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar'
oscap_eval_success = True
oscap_eval_fails = None
@@ -1186,21 +1063,15 @@ def test_run_step_pass_with_tailoring_file(
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
mount_path = '/does/not/matter/container-mount'
- artifact_config = {
- 'image-tar-file': {'description': '', 'value': image_tar_file}
- }
- workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
-
step_implementer = self.create_step_implementer(
step_config=step_config,
step_name='test',
implementer='OpenSCAP',
- workflow_result=workflow_result,
parent_work_dir_path=parent_work_dir_path
)
get_oscap_document_type_mock.return_value = oscap_document_type
- buildah_mount_container_mock.return_value = mount_path
+ mount_container_mock.return_value = mount_path
run_oscap_scan_mock.return_value = [
oscap_eval_success,
oscap_eval_fails
@@ -1208,7 +1079,7 @@ def test_run_step_pass_with_tailoring_file(
stdout_buff = StringIO()
with redirect_stdout(stdout_buff):
- result = step_implementer._run_step()
+ step_result = step_implementer._run_step()
expected_results = StepResult(
step_name='test',
@@ -1228,16 +1099,18 @@ def test_run_step_pass_with_tailoring_file(
name='stdout-report',
value=f"{step_implementer.work_dir_path}/oscap-xccdf-out"
)
- self.assertEqual(expected_results, result)
+ self.assertEqual(expected_results, step_result)
stdout = stdout_buff.getvalue()
+
+ expected_container_name = 'mock-image-working-container-mock-1'
self.assertRegex(
stdout,
re.compile(
- rf".*Import image: {image_tar_file}"
- rf".*Imported image: {image_tar_file}"
- rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*"
- rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'"
+ rf".*Create container from image \({container_image_tag}\)"
+ rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)"
+ rf".*Mount container: {expected_container_name}"
+ rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'"
rf".*Download input definitions: {oscap_input_definitions_uri}"
rf".*Downloaded input definitions to: /.+/working/test/rhel\-8.ds.xml"
rf".*Download oscap tailoring file: {oscap_tailoring_uri}"
@@ -1254,25 +1127,31 @@ def test_run_step_pass_with_tailoring_file(
@patch('ploigos_step_runner.step_implementers.shared.openscap_generic.download_and_decompress_source_to_destination')
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan')
- @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container')
+ @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container')
+ @patch(
+ 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image',
+ return_value='mock-image-working-container-mock-1'
+ )
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type')
@patch('sh.buildah', create=True)
- def test_run_step_fail_downloading_open_scap_input_file(
+ def test_fail_downloading_open_scap_input_file(
self,
buildah_mock,
get_oscap_document_type_mock,
- buildah_mount_container_mock,
+ create_container_from_image_mock,
+ mount_container_mock,
run_oscap_scan_mock,
download_and_decompress_source_to_destination_mock
):
oscap_document_type = 'Source Data Stream'
+ oscap_eval_type = 'xccdf'
oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2'
+ container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock'
step_config = {
'oscap-input-definitions-uri': oscap_input_definitions_uri,
- 'oscap-profile': 'foo'
+ 'oscap-profile': 'foo',
+ 'container-image-tag': container_image_tag
}
- image_tar_file_name = 'my_awesome_app'
- image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar'
oscap_eval_success = True
oscap_eval_fails = None
@@ -1280,21 +1159,15 @@ def test_run_step_fail_downloading_open_scap_input_file(
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
mount_path = '/does/not/matter/container-mount'
- artifact_config = {
- 'image-tar-file': {'description': '', 'value': image_tar_file}
- }
- workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
-
step_implementer = self.create_step_implementer(
step_config=step_config,
step_name='test',
implementer='OpenSCAP',
- workflow_result=workflow_result,
parent_work_dir_path=parent_work_dir_path
)
get_oscap_document_type_mock.return_value = oscap_document_type
- buildah_mount_container_mock.return_value = mount_path
+ mount_container_mock.return_value = mount_path
run_oscap_scan_mock.return_value = [
oscap_eval_success,
oscap_eval_fails
@@ -1305,10 +1178,12 @@ def test_run_step_fail_downloading_open_scap_input_file(
mock_error_msg
)
+ # run test
stdout_buff = StringIO()
with redirect_stdout(stdout_buff):
step_result = step_implementer._run_step()
+ # verify results
expected_step_result = StepResult(
step_name='test',
sub_step_name='OpenSCAP',
@@ -1317,18 +1192,17 @@ def test_run_step_fail_downloading_open_scap_input_file(
expected_step_result.success = False
expected_step_result.message = 'Error downloading OpenSCAP input file: ' \
f'{mock_error_msg}'
-
- self.assertEqual(step_result.get_step_result_dict(), expected_step_result.get_step_result_dict())
+ self.assertEqual(expected_step_result, step_result)
stdout = stdout_buff.getvalue()
-
+ expected_container_name = 'mock-image-working-container-mock-1'
self.assertRegex(
stdout,
re.compile(
- rf".*Import image: {image_tar_file}"
- rf".*Imported image: {image_tar_file}"
- rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*"
- rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'"
+ rf".*Create container from image \({container_image_tag}\)"
+ rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)"
+ rf".*Mount container: {expected_container_name}"
+ rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'"
rf".*Download input definitions: {oscap_input_definitions_uri}",
re.DOTALL
)
@@ -1336,54 +1210,56 @@ def test_run_step_fail_downloading_open_scap_input_file(
@patch('ploigos_step_runner.step_implementers.shared.openscap_generic.download_and_decompress_source_to_destination')
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan')
- @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container')
+ @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container')
+ @patch(
+ 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image',
+ return_value='mock-image-working-container-mock-1'
+ )
@patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type')
@patch('sh.buildah', create=True)
- def test_run_step_fail_downloading_open_scap_tailoring_file(
+ def test_fail_downloading_open_scap_tailoring_file(
self,
buildah_mock,
get_oscap_document_type_mock,
- buildah_mount_container_mock,
+ create_container_from_image_mock,
+ mount_container_mock,
run_oscap_scan_mock,
download_and_decompress_source_to_destination_mock
):
oscap_document_type = 'Source Data Stream'
+ oscap_eval_type = 'xccdf'
oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2'
+ container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock'
oscap_tailoring_uri = 'https://raw.githubusercontent.com/ploigos/ploigos-example-oscap-content/main/xccdf_com.redhat.ploigos_profile_example_ubi8-tailoring-xccdf.xml'
step_config = {
'oscap-input-definitions-uri': oscap_input_definitions_uri,
+ 'oscap-profile': 'foo',
'oscap-tailoring-uri': oscap_tailoring_uri,
- 'oscap-profile': 'foo'
+ 'container-image-tag': container_image_tag
}
- image_tar_file_name = 'my_awesome_app'
- image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar'
oscap_eval_success = True
oscap_eval_fails = None
with TempDirectory() as temp_dir:
+ # setup test
parent_work_dir_path = os.path.join(temp_dir.path, 'working')
mount_path = '/does/not/matter/container-mount'
- artifact_config = {
- 'image-tar-file': {'description': '', 'value': image_tar_file}
- }
- workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config)
-
step_implementer = self.create_step_implementer(
step_config=step_config,
step_name='test',
implementer='OpenSCAP',
- workflow_result=workflow_result,
parent_work_dir_path=parent_work_dir_path
)
get_oscap_document_type_mock.return_value = oscap_document_type
- buildah_mount_container_mock.return_value = mount_path
+ mount_container_mock.return_value = mount_path
run_oscap_scan_mock.return_value = [
oscap_eval_success,
oscap_eval_fails
]
+ # run test with mock error
mock_error_msg = 'mock error downloading open scap file'
download_and_decompress_source_to_destination_mock.side_effect = [
"foo",
@@ -1394,6 +1270,7 @@ def test_run_step_fail_downloading_open_scap_tailoring_file(
with redirect_stdout(stdout_buff):
step_result = step_implementer._run_step()
+ # verify results
expected_step_result = StepResult(
step_name='test',
sub_step_name='OpenSCAP',
@@ -1402,19 +1279,20 @@ def test_run_step_fail_downloading_open_scap_tailoring_file(
expected_step_result.success = False
expected_step_result.message = 'Error downloading OpenSCAP tailoring file: ' \
f'{mock_error_msg}'
-
- self.assertEqual(step_result.get_step_result_dict(), expected_step_result.get_step_result_dict())
+ self.assertEqual(expected_step_result, step_result)
stdout = stdout_buff.getvalue()
-
+ expected_container_name = 'mock-image-working-container-mock-1'
self.assertRegex(
stdout,
re.compile(
- rf".*Import image: {image_tar_file}"
- rf".*Imported image: {image_tar_file}"
- rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*"
- rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'"
- rf".*Download input definitions: {oscap_input_definitions_uri}",
+ rf".*Create container from image \({container_image_tag}\)"
+ rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)"
+ rf".*Mount container: {expected_container_name}"
+ rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'"
+ rf".*Download input definitions: {oscap_input_definitions_uri}"
+ rf".*Downloaded input definitions to: foo"
+ rf".*Download oscap tailoring file: {oscap_tailoring_uri}",
re.DOTALL
)
)
diff --git a/tests/step_implementers/sign_container_image/test_podman_sign.py b/tests/step_implementers/sign_container_image/test_podman_sign.py
index f5b275db4..7cb10025d 100644
--- a/tests/step_implementers/sign_container_image/test_podman_sign.py
+++ b/tests/step_implementers/sign_container_image/test_podman_sign.py
@@ -97,7 +97,7 @@ def test__required_config_or_result_keys(self):
required_keys = PodmanSign._required_config_or_result_keys()
expected_required_keys = [
['signer-pgp-private-key', 'container-image-signer-pgp-private-key'],
- 'container-image-tag'
+ ['container-image-push-tag', 'container-image-tag']
]
self.assertEqual(required_keys, expected_required_keys)
@@ -162,12 +162,15 @@ def sign_image_side_effect(
containers_config_tls_verify=True,
container_command_short_name='podman'
)
-
expected_step_result = StepResult(
step_name='sign-container-image',
sub_step_name='PodmanSign',
sub_step_implementer_name='PodmanSign'
)
+ expected_step_result.add_artifact(
+ name='container-image-signed-tag',
+ value=container_image_tag,
+ )
expected_step_result.add_artifact(
name='container-image-signature-file-path',
value= os.path.join(
@@ -256,6 +259,10 @@ def sign_image_side_effect(
sub_step_name='PodmanSign',
sub_step_implementer_name='PodmanSign'
)
+ expected_step_result.add_artifact(
+ name='container-image-signed-tag',
+ value=container_image_tag,
+ )
expected_step_result.add_artifact(
name='container-image-signature-file-path',
value= os.path.join(
@@ -359,6 +366,14 @@ def sign_image_side_effect(
sub_step_name='PodmanSign',
sub_step_implementer_name='PodmanSign'
)
+ expected_step_result.add_artifact(
+ name='container-image-signed-tag',
+ value=container_image_tag,
+ )
+ expected_step_result.add_artifact(
+ name='container-image-signed-tag',
+ value=container_image_tag,
+ )
expected_step_result.add_artifact(
name='container-image-signature-file-path',
value= os.path.join(
@@ -463,6 +478,10 @@ def sign_image_side_effect(
sub_step_name='PodmanSign',
sub_step_implementer_name='PodmanSign'
)
+ expected_step_result.add_artifact(
+ name='container-image-signed-tag',
+ value=container_image_tag,
+ )
expected_step_result.add_artifact(
name='container-image-signature-file-path',
value= os.path.join(
@@ -487,9 +506,8 @@ def sign_image_side_effect(
expected_step_result.success = False
expected_step_result.message = 'mock upload error'
- print(expected_step_result)
- print(result)
self.assertEqual(expected_step_result, result)
+
@patch('ploigos_step_runner.step_implementers.sign_container_image.podman_sign.import_pgp_key')
@patch.object(PodmanSign, '_PodmanSign__sign_image')
def test_run_step_fail_import_pgp_key(self, sign_image_mock, import_pgp_key_mock):
@@ -538,8 +556,6 @@ def sign_image_side_effect(
expected_step_result.success = False
expected_step_result.message = 'mock error importing pgp key'
- print(expected_step_result)
- print(result)
self.assertEqual(expected_step_result, result)
@patch('ploigos_step_runner.step_implementers.sign_container_image.podman_sign.import_pgp_key')
diff --git a/tests/utils/test_containers.py b/tests/utils/test_containers.py
index 0d7d211c6..c7a6d2cdb 100644
--- a/tests/utils/test_containers.py
+++ b/tests/utils/test_containers.py
@@ -4,8 +4,7 @@
import sh
from ploigos_step_runner.config import ConfigValue
-from ploigos_step_runner.utils.containers import (container_registries_login,
- container_registry_login)
+from ploigos_step_runner.utils.containers import *
from tests.helpers.base_test_case import BaseTestCase
from tests.helpers.test_utils import *
@@ -992,3 +991,145 @@ def test_list_of_dicts_falsey(self, container_registry_login_mock):
)
]
container_registry_login_mock.assert_has_calls(calls)
+
+class Test_create_container_from_image(BaseTestCase):
+ @patch('sh.buildah', create=True)
+ def test_success_default_repository_type(self, buildah_mock):
+ # setup test
+ image_tag = "localhost/mock-org/mock-image:v0.42.0-mock"
+
+ # run test
+ buildah_mock.side_effect = create_sh_side_effect(
+ mock_stdout='mock-image-working-container-mock'
+ )
+ actual_container_name = create_container_from_image(
+ image_tag=image_tag
+ )
+
+ # verify
+ self.assertEqual(actual_container_name, 'mock-image-working-container-mock')
+
+ buildah_mock.assert_called_once_with(
+ 'from',
+ f"container-storage:{image_tag}",
+ _out=Any(IOBase),
+ _err=Any(IOBase),
+ _tee='err'
+ )
+
+ @patch('sh.buildah', create=True)
+ def test_success_remote_repository_type(self, buildah_mock):
+ # setup test
+ image_tag = "quay.io/mock-org/mock-image:v0.42.0-mock"
+
+ # run test
+ buildah_mock.side_effect = create_sh_side_effect(
+ mock_stdout='mock-image-working-container-mock'
+ )
+ actual_container_name = create_container_from_image(
+ image_tag=image_tag,
+ repository_type='docker://'
+ )
+
+ # verify
+ self.assertEqual(actual_container_name, 'mock-image-working-container-mock')
+
+ buildah_mock.assert_called_once_with(
+ 'from',
+ f"docker://{image_tag}",
+ _out=Any(IOBase),
+ _err=Any(IOBase),
+ _tee='err'
+ )
+
+ @patch('sh.buildah', create=True)
+ def test_error(self, buildah_mock):
+ # setup test
+ image_tag = "localhost/mock-org/mock-image:v0.42.0-mock"
+
+ # run test with mock error
+ with self.assertRaisesRegex(
+ RuntimeError,
+ re.compile(
+ rf"Error creating container from image \({image_tag}\):"
+ r".*RAN: buildah"
+ r".*STDOUT:"
+ r".*mock out"
+ r".*STDERR:"
+ r".*mock error",
+ re.DOTALL
+ )
+ ):
+ buildah_mock.side_effect = sh.ErrorReturnCode('buildah', b'mock out', b'mock error')
+ create_container_from_image(
+ image_tag=image_tag
+ )
+
+ # verify
+ buildah_mock.assert_called_once_with(
+ 'from',
+ f"container-storage:{image_tag}",
+ _out=Any(IOBase),
+ _err=Any(IOBase),
+ _tee='err'
+ )
+
+class Test_mount_container(BaseTestCase):
+ @patch('sh.buildah', create=True)
+ def test_success(self, buildah_mock):
+ buildah_unshare_command = sh.buildah.bake('unshare')
+ container_name = "test"
+
+ expected_mount_path = '/this/is/a/path'
+ buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = create_sh_side_effect(
+ mock_stdout=f"{expected_mount_path}",
+ )
+
+ container_mount_path = mount_container(
+ buildah_unshare_command=buildah_unshare_command,
+ container_id=container_name
+ )
+
+ self.assertEqual(container_mount_path, expected_mount_path)
+
+ buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with(
+ container_name,
+ _out=Any(IOBase),
+ _err=Any(IOBase),
+ _tee='err'
+ )
+
+ @patch('sh.buildah', create=True)
+ def test_buildah_error(self, buildah_mock):
+ buildah_unshare_command = sh.buildah.bake('unshare')
+ container_name = "test"
+
+ buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = sh.ErrorReturnCode(
+ 'buildah mount',
+ b'mock out',
+ b'mock error'
+ )
+
+ with self.assertRaisesRegex(
+ RuntimeError,
+ re.compile(
+ rf"Error mounting container \({container_name}\):"
+ r".*RAN: buildah"
+ r".*STDOUT:"
+ r".*mock out"
+ r".*STDERR:"
+ r".*mock error",
+ re.DOTALL
+ )
+ ):
+ mount_container(
+ buildah_unshare_command=buildah_unshare_command,
+ container_id=container_name
+ )
+
+ buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with(
+ container_name,
+ _out=Any(IOBase),
+ _err=Any(IOBase),
+ _tee='err'
+ )
\ No newline at end of file