From e0c8bac653a1846af8cedbf614fb54f48b2a3f80 Mon Sep 17 00:00:00 2001 From: Ian Tewksbury Date: Mon, 2 Aug 2021 14:22:14 -0400 Subject: [PATCH] use local container registry on persistent volume rather then tar files --- .../results/workflow_result.py | 4 +- .../openscap.py | 17 +- .../openscap.py | 17 +- .../create_container_image/buildah.py | 202 +++++--- .../push_container_image/skopeo.py | 210 +++++--- .../shared/openscap_generic.py | 160 ++---- .../sign_container_image/podman_sign.py | 12 +- src/ploigos_step_runner/utils/containers.py | 100 ++++ .../test_openscap_compliance.py | 178 ++----- .../test_openscap_vulnerability.py | 49 +- .../test_buildah_create_container_image.py | 454 ++++++++++++----- .../test_skopeo_push_container_image.py | 391 ++++++++++---- .../shared/test_openscap_generic.py | 476 +++++++----------- .../sign_container_image/test_podman_sign.py | 28 +- tests/utils/test_containers.py | 145 +++++- 15 files changed, 1514 insertions(+), 929 deletions(-) diff --git a/src/ploigos_step_runner/results/workflow_result.py b/src/ploigos_step_runner/results/workflow_result.py index 4109ead9..3375fe74 100644 --- a/src/ploigos_step_runner/results/workflow_result.py +++ b/src/ploigos_step_runner/results/workflow_result.py @@ -35,8 +35,8 @@ def get_artifact_value( ): # pylint: disable=too-many-boolean-expressions """Search for an artifact. - If step_name, sub_step_name, or environment are provided ensure the artifact comes - from the first + If step_name, sub_step_name, or environment are not provided ensure the artifact comes + from the last step that returned that artifact. 1. if step_name is provided, look for the artifact in the step 2. elif step_name and sub_step_name is provided, look for the artifact in the step/sub_step diff --git a/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py b/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py index b5e53423..efea4d12 100644 --- a/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py +++ b/src/ploigos_step_runner/step_implementers/container_image_static_compliance_scan/openscap.py @@ -14,16 +14,15 @@ Configuration Key | Required? | Default | Description -------------------------------|-----------|---------|----------- -`image-tar-file` | Yes | | Path to container image tar file to scan +`container-image-tag` | Yes | | Container image tag to scan. `oscap-input-definitions-uri` | Yes | | URI to the OpenSCAP definitions file \ to do the evaluation with. \ - Must use protocol file://|http://|https://. - | | | Must have file extension .xml|.bz2. -`oscap-profile` | Yes | | OpenSCAP profile to evaluate. + Must use protocol file://|http://|https://. \ + Must have file extension .xml|.bz2. +`oscap-profile` | No | | OpenSCAP profile to evaluate. `oscap-tailoring-uri` | No | | URI to OpenSCAP tailoring file \ to do the evaluation with. \ - Must use protocol \ - file://|http://|https://. \ + Must use protocol file://|http://|https://. \ Must have file extension .xml|.bz2. `oscap-fetch-remote-resources` | No | True | For Source DataStream and XCCDF files \ that have remote references fetch them if \ @@ -34,6 +33,12 @@ remote resources and this is not True. \ For disconnected environments the remote \ internal mirror. +`[container-image-pull-repository-type, container-image-repository-type]` \ + | Yes | 'containers-storage:' \ + | \ + Container repository type for the pull image source. \ + See https://github.com/containers/skopeo for valid \ + options. Result Artifacts ---------------- diff --git a/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py b/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py index e4549675..f46d847e 100644 --- a/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py +++ b/src/ploigos_step_runner/step_implementers/container_image_static_vulnerability_scan/openscap.py @@ -13,17 +13,16 @@ * previous step results Configuration Key | Required? | Default | Description --------------------------------|-----------|---------|------------ -`image-tar-file` | Yes | | Path to container image tar file to scan +-------------------------------|-----------|---------|----------- +`container-image-tag` | Yes | | Container image tag to scan. `oscap-input-definitions-uri` | Yes | | URI to the OpenSCAP definitions file \ to do the evaluation with. \ - Must use protocol file://|http://|https://. - | | | Must have file extension .xml|.bz2. + Must use protocol file://|http://|https://. \ + Must have file extension .xml|.bz2. `oscap-profile` | No | | OpenSCAP profile to evaluate. `oscap-tailoring-uri` | No | | URI to OpenSCAP tailoring file \ to do the evaluation with. \ - Must use protocol \ - file://|http://|https://. \ + Must use protocol file://|http://|https://. \ Must have file extension .xml|.bz2. `oscap-fetch-remote-resources` | No | True | For Source DataStream and XCCDF files \ that have remote references fetch them if \ @@ -34,6 +33,12 @@ remote resources and this is not True. \ For disconnected environments the remote \ internal mirror. +`[container-image-pull-repository-type, container-image-repository-type]` \ + | Yes | 'containers-storage:' \ + | \ + Container repository type for the pull image source. \ + See https://github.com/containers/skopeo for valid \ + options. Result Artifacts ---------------- diff --git a/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py b/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py index 827804a8..a1b31dd7 100644 --- a/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py +++ b/src/ploigos_step_runner/step_implementers/create_container_image/buildah.py @@ -9,41 +9,64 @@ * runtime configuration * previous step results -Configuration Key | Required? | Default | Description -------------------|-----------|---------|----------- -`imagespecfile` | True | `'Containerfile'` \ - | File defining the container image -`context` | True | `'.'` | Context to build the container image in -`tls-verify` | True | `True` | Whether to verify TLS when pulling parent images -`format` | True | `'oci'` | format of the built image's manifest and metadata +Configuration Key | Required? | Default | Description +-------------------|-----------|---------|----------- +`imagespecfile` | True | `'Containerfile'` \ + | File defining the container image +`context` | True | `'.'` | Context to build the container image in +`tls-verify` | True | `True` | Whether to verify TLS when pulling parent images +`format` | True | `'oci'` | format of the built image's manifest and metadata `containers-config-auth-file` \ - | True | `'~/.buildah-auth.json'` \ - | Path to the container registry authentication \ - file to use for container registry authentication. + | False | | Path to the container registry authentication \ + file to use for container registry authentication. \ + If one is not provided one will be created in the \ + working directory. `container-image-version` \ - | True | | Version to use when building the container image + | True | | Version to use when building the container image +`organization` | True | | Used in built container image tag +`application_name` | True | | Used in built container image tag +`service_name` | True | | Used in built container image tag +`container-registries` \ + | False | | Hash of container registries to authenticate with. Result Artifacts ---------------- Results artifacts output by this step. -Result Artifact Key | Description ---------------------------|------------ -`container-image-version` | Container version to tag built image with -`image-tar-file` | Path to the built container image as a tar file -""" +Result Artifact Key | Description +-------------------------------|------------ +`container-image-registry-uri` | Registry URI poriton of the container image tag \ + of the built container image. +`container-image-registry-organization` \ + | Organization portion of the container image tag \ + of the built container image. +`container-image-repository` | Repository portion of the container image tag \ + of the built container image. +`container-image-name` | Another way to reference the \ + repository portion of the container image tag \ + of the built container image. +`container-image-version` | Version portion of the container image tag \ + of the built container image. +`container-image-tag` | Full container image tag of the built container, \ + including the registry URI.
\ + Takes the form of: \ + `container-image-registry-organization/container-image-repository:container-image-version` +`container-image-short-tag` | Short container image tag of the built container image, \ + excluding the registry URI.
\ + Takes the form of: \ + `container-image-registry-uri/container-image-registry-organization/container-image-repository:container-image-version` + +""" # pylint: disable=line-too-long + import os import sys -from pathlib import Path +from distutils import util import sh from ploigos_step_runner import StepImplementer, StepResult from ploigos_step_runner.utils.containers import container_registries_login DEFAULT_CONFIG = { - # Path to the container registry authentication file to read and write to/from. - 'containers-config-auth-file': os.path.join(Path.home(), '.buildah-auth.json'), - # Image specification file name 'imagespecfile': 'Containerfile', @@ -58,11 +81,11 @@ } REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [ - 'containers-config-auth-file', 'imagespecfile', 'context', 'tls-verify', 'format', + 'organization', 'service-name', 'application-name' ] @@ -103,6 +126,29 @@ def _required_config_or_result_keys(): """ return REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS + def _validate_required_config_or_previous_step_result_artifact_keys(self): + """Validates that the required configuration keys or previous step result artifacts + are set and have valid values. + + Validates that: + * required configuration is given + * given 'imagespecfile' exists + + Raises + ------ + AssertionError + If step configuration or previous step result artifacts have invalid required values + """ + super()._validate_required_config_or_previous_step_result_artifact_keys() + + # if pom-file has value verify file exists + # If it doesn't have value and is required function will have already failed + image_spec_file = self.get_value('imagespecfile') + context = self.get_value('context') + image_spec_file_full_path = os.path.join(context, image_spec_file) + assert os.path.exists(image_spec_file_full_path), \ + f'Given imagespecfile ({image_spec_file}) does not exist in given context ({context}).' + def _run_step(self): """Runs the step implemented by this StepImplementer. @@ -113,38 +159,33 @@ def _run_step(self): """ step_result = StepResult.from_step_implementer(self) - context = self.get_value('context') + # get config image_spec_file = self.get_value('imagespecfile') - image_spec_file_location = os.path.join(context, image_spec_file) - application_name = self.get_value('application-name') - service_name = self.get_value('service-name') tls_verify = self.get_value('tls-verify') + if isinstance(tls_verify, str): + tls_verify = bool(util.strtobool(tls_verify)) - if not os.path.exists(image_spec_file_location): - step_result.success = False - step_result.message = 'Image specification file does not exist in location: ' \ - f'{image_spec_file_location}' - return step_result - - image_tag_version = self.get_value('container-image-version') - if image_tag_version is None: - image_tag_version = 'latest' + # create local build tag + image_version = self.get_value('container-image-version') + if image_version is None: + image_version = 'latest' print('No image tag version found in metadata. Using latest') - - destination = "localhost/{application_name}/{service_name}".format( - application_name=application_name, - service_name=service_name - ) - tag = "{destination}:{version}".format( - destination=destination, - version=image_tag_version - ) + image_registry_uri = 'localhost' + image_registry_organization = self.get_value('organization') + image_repository = f"{self.get_value('application-name')}-{self.get_value('service-name')}" + short_tag = f"{image_registry_organization}/{image_repository}:{image_version}" + build_tag = f"{image_registry_uri}/{short_tag}" try: # login to any provider container registries # NOTE: important to specify the auth file because depending on the context this is # being run in python process may not have permissions to default location containers_config_auth_file = self.get_value('containers-config-auth-file') + if not containers_config_auth_file: + containers_config_auth_file = os.path.join( + self.work_dir_path, + 'container-auth.json' + ) container_registries_login( registries=self.get_value('container-registries'), containers_config_auth_file=containers_config_auth_file, @@ -156,48 +197,61 @@ def _run_step(self): '--format=' + self.get_value('format'), '--tls-verify=' + str(tls_verify).lower(), '--layers', '-f', image_spec_file, - '-t', tag, + '-t', build_tag, '--authfile', containers_config_auth_file, - context, + self.get_value('context'), _out=sys.stdout, _err=sys.stderr, _tee='err' ) - step_result.add_artifact( - name='container-image-version', - value=tag - ) except sh.ErrorReturnCode as error: # pylint: disable=undefined-variable step_result.success = False step_result.message = 'Issue invoking buildah bud with given image ' \ f'specification file ({image_spec_file}): {error}' return step_result - image_tar_file = f'image-{application_name}-{service_name}-{image_tag_version}.tar' - image_tar_path = os.path.join(self.work_dir_path, image_tar_file) - try: - # Check to see if the tar docker-archive file already exists - # this needs to be run as buildah does not support overwritting - # existing files. - if os.path.exists(image_tar_path): - os.remove(image_tar_path) - sh.buildah.push( # pylint: disable=no-member - tag, - "docker-archive:" + image_tar_path, - _out=sys.stdout, - _err=sys.stderr, - _tee='err' - ) - - step_result.add_artifact( - name='image-tar-file', - value=image_tar_path - ) - except sh.ErrorReturnCode as error: # pylint: disable=undefined-variable - step_result.success = False - step_result.message = f'Issue invoking buildah push to tar file ' \ - f'({image_tar_path}): {error}' - return step_result + # add artifacts + step_result.add_artifact( + name='container-image-registry-uri', + value=image_registry_uri, + description='Registry URI poriton of the container image tag' \ + ' of the built container image.' + ) + step_result.add_artifact( + name='container-image-registry-organization', + value=image_registry_organization, + description='Organization portion of the container image tag' \ + ' of the built container image.' + ) + step_result.add_artifact( + name='container-image-repository', + value=image_repository, + description='Repository portion of the container image tag' \ + ' of the built container image.' + ) + step_result.add_artifact( + name='container-image-name', + value=image_repository, + description='Another way to reference the' \ + ' repository portion of the container image tag of the built container image.' + ) + step_result.add_artifact( + name='container-image-version', + value=image_version, + description='Version portion of the container image tag of the built container image.' + ) + step_result.add_artifact( + name='container-image-tag', + value=build_tag, + description='Full container image tag of the built container,' \ + ' including the registry URI.' + ) + step_result.add_artifact( + name='container-image-short-tag', + value=short_tag, + description='Short container image tag of the built container image,' \ + ' excluding the registry URI.' + ) return step_result diff --git a/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py b/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py index 512b8645..8b096dec 100644 --- a/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py +++ b/src/ploigos_step_runner/step_implementers/push_container_image/skopeo.py @@ -9,66 +9,97 @@ * runtime configuration * previous step results -Configuration Key | Required? | Default | Description -------------------|-----------|----------|----------- -`destination-url` | Yes | | Container image repository destination to push image \ - to. o not include the `docker://` prefix as it will \ - automatically be applied -`src-tls-verify` | Yes | `'true'` | Whether to very TLS for source of image -`dest-tls-verify` | Yes | `'true'` | Whether to verify TLS for destination of image -`containers-config-auth-file` | Yes | `'~/.container-image-repo-auth.json'` | \ - Path to the container registry authentication file \ - to use for container registry authentication. -`container-image-version` | Yes | | Tag to push container image with -`image-tar-file` | Yes | | Local tar file of container image to push +Configuration Key | Required? | Default | Description +-------------------|-----------|---------|----------- +`container-image-version` \ + | Yes | | Version to use when pushing the container image +`organization` | Yes | | Used in creating the container image push tag +`application_name` | Yes | | Used in creating the container image push tag +`service_name` | Yes | | Used in creating the container image push tag +`destination-url` | Yes | | Container image repository destination to push image \ + to.
\ + Should not include the repository type. +`[source-tls,verify, src-tls-verify]` \ + | Yes | `True` | Whether to verify TLS when pulling source image. +`dest-tls-verify` | Yes | `True` | Whether to verify TLS when pushing destination image. +`[container-image-pull-tag, container-image-tag]` \ + | Yes | | Container image tag of image to push to \ + `destination-url`. +`[container-image-pull-repository-type, container-image-repository-type]` \ + | Yes | 'containers-storage:' \ + | Container repository type for the pull image source. \ + See https://github.com/containers/skopeo for valid \ + options. +`[container-image-push-repository-type, container-image-repository-type]` \ + | Yes | 'docker://' \ + | Container repository type for the push image source. \ + See https://github.com/containers/skopeo for valid \ + options. +`containers-config-auth-file` \ + | No | | Path to the container registry authentication file \ + to use for container registry authentication. \ + If one is not provided one will be created in the \ + working directory. +`container-registries` \ + | False | | Hash of container registries to authenticate with. + Result Artifacts ---------------- Results artifacts output by this step. -Result Artifact Key | Description -----------------------------------------|------------ -`container-image-registry-uri` | URI to the image registry service. -`container-image-registry-organization` | Organization in the image registry to push the image to. -`container-image-repository` | Repository in the Organization in the image registry to \ - push the image to. -`container-image-name` | Name of the image to push to the Image Repository. \ - This is the same value as `container-image-repository` as\ - these are always the same, but people refer to them \ - differently in different cases, so providing both. -`container-image-version` | Version of the image to push. -`container-image-tag` | Tag container image was pushed with.
\ - Takes the form of: \ - "`container-image-registry-uri`\ - /`container-image-registry-organization`\ - /`container-image-repository`\ - :`container-image-version`" -""" +Result Artifact Key | Description +-------------------------------|------------ +`container-image-registry-uri` | Registry URI poriton of the container image tag \ + of the pushed container image. +`container-image-registry-organization` \ + | Organization portion of the container image tag \ + of the pushed container image. +`container-image-repository` | Repository portion of the container image tag \ + of the pushed container image. +`container-image-name` | Another way to reference the \ + repository portion of the container image tag \ + of the pushed container image. +`container-image-version` | Version portion of the container image tag \ + of the pushed container image. +`container-image-tag` | Full container image tag of the pushed container, \ + including the registry URI.
\ + Takes the form of: \ + `container-image-registry-organization/container-image-repository:container-image-version` +`container-image-short-tag` | Short container image tag of the pushed container image, \ + excluding the registry URI.
\ + Takes the form of: \ + `container-image-registry-uri/container-image-registry-organization/container-image-repository:container-image-version` + +""" # pylint: disable=line-too-long + import os import sys from distutils import util -from pathlib import Path import sh from ploigos_step_runner import StepImplementer, StepResult from ploigos_step_runner.utils.containers import container_registries_login DEFAULT_CONFIG = { - 'src-tls-verify': 'true', - 'dest-tls-verify': 'true', - 'containers-config-auth-file': os.path.join(Path.home(), '.container-image-repo-auth.json') + 'src-tls-verify': True, + 'dest-tls-verify': True, + 'container-image-pull-repository-type': 'containers-storage:', + 'container-image-push-repository-type': 'docker://' } REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [ - 'containers-config-auth-file', 'destination-url', - 'src-tls-verify', + ['source-tls,verify', 'src-tls-verify'], 'dest-tls-verify', 'service-name', 'application-name', 'organization', - 'container-image-version', - 'image-tar-file' + + # being flexible for different use cases of proceeding steps + ['container-image-pull-tag', 'container-image-tag'], + ['container-image-pull-repository-type', 'container-image-repository-type'], + ['container-image-push-repository-type', 'container-image-repository-type'] ] class Skopeo(StepImplementer): @@ -117,55 +148,108 @@ def _run_step(self): """ step_result = StepResult.from_step_implementer(self) - image_version = self.get_value('container-image-version').lower() - application_name = self.get_value('application-name') - service_name = self.get_value('service-name') - organization = self.get_value('organization') - image_tar_file = self.get_value('image-tar-file') - destination_url = self.get_value('destination-url') + # get config + image_pull_tag = self.get_value(['container-image-pull-tag', 'container-image-tag']) + pull_repository_type = self.get_value([ + 'container-image-pull-repository-type', + 'container-image-repository-type' + ]) + push_repository_type = self.get_value([ + 'container-image-push-repository-type', + 'container-image-repository-type' + ]) dest_tls_verify = self.get_value('dest-tls-verify') - - image_registry_uri = destination_url - image_registry_organization = organization - image_repository = f"{application_name}-{service_name}" - image_tag = f"{image_registry_uri}/{image_registry_organization}" \ - f"/{image_repository}:{image_version}" + if isinstance(dest_tls_verify, str): + dest_tls_verify = bool(util.strtobool(dest_tls_verify)) + source_tls_verify = self.get_value(['source-tls-verify', 'src-tls-verify']) + if isinstance(source_tls_verify, str): + source_tls_verify = bool(util.strtobool(source_tls_verify)) + + # create push tag + image_version = self.get_value('container-image-version') + if image_version is None: + image_version = 'latest' + print('No image tag version found in metadata. Using latest') + image_version = image_version.lower() + image_registry_uri = self.get_value('destination-url') + image_registry_organization = self.get_value('organization') + image_repository = f"{self.get_value('application-name')}-{self.get_value('service-name')}" + image_push_short_tag = f"{image_registry_organization}/{image_repository}:{image_version}" + image_push_tag = f"{image_registry_uri}/{image_push_short_tag}" try: # login to any provider container registries # NOTE: important to specify the auth file because depending on the context this is # being run in python process may not have permissions to default location containers_config_auth_file = self.get_value('containers-config-auth-file') + if not containers_config_auth_file: + containers_config_auth_file = os.path.join( + self.work_dir_path, + 'container-auth.json' + ) container_registries_login( registries=self.get_value('container-registries'), containers_config_auth_file=containers_config_auth_file, - containers_config_tls_verify=util.strtobool(dest_tls_verify) + containers_config_tls_verify=dest_tls_verify ) # push image sh.skopeo.copy( # pylint: disable=no-member - f"--src-tls-verify={str(self.get_value('src-tls-verify'))}", - f"--dest-tls-verify={str(self.get_value('dest-tls-verify'))}", + f"--src-tls-verify={str(source_tls_verify).lower()}", + f"--dest-tls-verify={str(dest_tls_verify).lower()}", f"--authfile={containers_config_auth_file}", - f'docker-archive:{image_tar_file}', - f'docker://{image_tag}', + f'{pull_repository_type}{image_pull_tag}', + f'{push_repository_type}{image_push_tag}', _out=sys.stdout, _err=sys.stderr, _tee='err' ) except sh.ErrorReturnCode as error: step_result.success = False - step_result.message = f'Error pushing container image ({image_tar_file}) ' \ - f' to tag ({image_tag}) using skopeo: {error}' + step_result.message = f'Error pushing container image ({image_pull_tag}) ' \ + f' to tag ({image_push_tag}) using skopeo: {error}' - step_result.add_artifact(name='container-image-registry-uri', value=image_registry_uri) + # add artifacts + step_result.add_artifact( + name='container-image-registry-uri', + value=image_registry_uri, + description='Registry URI poriton of the container image tag' \ + ' of the pushed container image.' + ) step_result.add_artifact( name='container-image-registry-organization', - value=image_registry_organization + value=image_registry_organization, + description='Organization portion of the container image tag' \ + ' of the pushed container image.' + ) + step_result.add_artifact( + name='container-image-repository', + value=image_repository, + description='Repository portion of the container image tag' \ + ' of the pushed container image.' + ) + step_result.add_artifact( + name='container-image-name', + value=image_repository, + description='Another way to reference the' \ + ' repository portion of the container image tag of the pushed container image.' + ) + step_result.add_artifact( + name='container-image-version', + value=image_version, + description='Version portion of the container image tag of the pushed container image.' + ) + step_result.add_artifact( + name='container-image-tag', + value=image_push_tag, + description='Full container image tag of the pushed container,' \ + ' including the registry URI.' + ) + step_result.add_artifact( + name='container-image-short-tag', + value=image_push_short_tag, + description='Short container image tag of the pushed container image,' \ + ' excluding the registry URI.' ) - step_result.add_artifact(name='container-image-repository', value=image_repository) - step_result.add_artifact(name='container-image-name', value=image_repository) - step_result.add_artifact(name='container-image-version', value=image_version) - step_result.add_artifact(name='container-image-tag', value=image_tag) return step_result diff --git a/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py b/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py index 69e3d9ab..654e56cb 100644 --- a/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py +++ b/src/ploigos_step_runner/step_implementers/shared/openscap_generic.py @@ -9,16 +9,16 @@ Configuration Key | Required? | Default | Description -------------------------------|-----------|---------|----------- -`image-tar-file` | Yes | | Path to container image tar file to scan +`container-image-tag` | Yes | | Container image tag to scan. `oscap-input-definitions-uri` | Yes | | URI to the OpenSCAP definitions file \ - to do the evaluation with. \ - Must use protocol file://|http://|https://. \ - Must have file extension .xml|.bz2. + to do the evaluation with. \ + Must use protocol file://|http://|https://. \ + Must have file extension .xml|.bz2. `oscap-profile` | No | | OpenSCAP profile to evaluate. `oscap-tailoring-uri` | No | | URI to OpenSCAP tailoring file \ - to do the evaluation with. \ - Must use protocol file://|http://|https://. \ - Must have file extension .xml|.bz2. + to do the evaluation with. \ + Must use protocol file://|http://|https://. \ + Must have file extension .xml|.bz2. `oscap-fetch-remote-resources` | No | True | For Source DataStream and XCCDF files \ that have remote references fetch them if \ True, else don't. \ @@ -28,15 +28,12 @@ remote resources and this is not True. \ For disconnected environments the remote \ internal mirror. - -Expected Previous Step Results ------------------------------- - -Results expected from previous steps that this step requires. - -| Step Name | Result Key | Description -|--------------------------|------------------|-------------- -| `create-container-image` | `image-tar-file` | Image to scan +`[container-image-pull-repository-type, container-image-repository-type]` \ + | Yes | 'containers-storage:' \ + | \ + Container repository type for the pull image source. \ + See https://github.com/containers/skopeo for valid \ + options. Results ------- @@ -52,23 +49,32 @@ import os import re -import sys from distutils.util import strtobool from io import StringIO import sh from ploigos_step_runner import StepResult, StepRunnerException from ploigos_step_runner.step_implementer import StepImplementer -from ploigos_step_runner.utils.file import download_and_decompress_source_to_destination -from ploigos_step_runner.utils.io import create_sh_redirect_to_multiple_streams_fn_callback +from ploigos_step_runner.utils.containers import (create_container_from_image, + mount_container) +from ploigos_step_runner.utils.file import \ + download_and_decompress_source_to_destination +from ploigos_step_runner.utils.io import \ + create_sh_redirect_to_multiple_streams_fn_callback DEFAULT_CONFIG = { - 'oscap-fetch-remote-resources': True + 'oscap-fetch-remote-resources': True, + 'container-image-pull-repository-type': 'containers-storage:', + 'container-image-repository-type': 'containers-storage:' } REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [ 'oscap-input-definitions-uri', - 'image-tar-file' + 'container-image-tag', + 'container-image-pull-repository-type', + + # being flexible for different use cases of proceeding steps + ['container-image-pull-repository-type', 'container-image-repository-type'] ] @@ -226,23 +232,23 @@ def _run_step(self): # pylint: disable=too-many-locals,too-many-statements """ step_result = StepResult.from_step_implementer(self) - image_tar_file = self.get_value('image-tar-file') - + # get config + image_tag = self.get_value('container-image-tag') oscap_profile = self.get_value('oscap-profile') oscap_fetch_remote_resources = self.get_value('oscap-fetch-remote-resources') - - # create a container name from the tar file name, step name, and sub step name - container_name = os.path.splitext(os.path.basename(image_tar_file))[0] - container_name += f"-{self.step_name}-{self.sub_step_name}" + pull_repository_type = self.get_value([ + 'container-image-pull-repository-type', + 'container-image-repository-type' + ]) try: - # import image tar file to vfs file system - print(f"\nImport image: {image_tar_file}") - OpenSCAPGeneric.__buildah_import_image_from_tar( - image_tar_file=image_tar_file, - container_name=container_name + # create container from image that can be mounted + print(f"\nCreate container from image ({image_tag})") + container_name = create_container_from_image( + image_tag=image_tag, + repository_type=pull_repository_type ) - print(f"Imported image: {image_tar_file}") + print(f"Created container ({container_name}) from image ({image_tag})") # baking `buildah unshare` command to wrap other buildah commands with # so that container does not need to be running in a privileged mode to be able @@ -254,7 +260,7 @@ def _run_step(self): # pylint: disable=too-many-locals,too-many-statements # NOTE: run in the context of `buildah unshare` so that container does not # need to be run in a privileged mode print(f"\nMount container: {container_name}") - container_mount_path = OpenSCAPGeneric.__buildah_mount_container( + container_mount_path = mount_container( buildah_unshare_command=buildah_unshare_command, container_id=container_name ) @@ -352,96 +358,12 @@ def _run_step(self): # pylint: disable=too-many-locals,too-many-statements name='stdout-report', value=oscap_out_file_path ) - except StepRunnerException as error: + except (StepRunnerException, RuntimeError) as error: step_result.success = False step_result.message = str(error) return step_result - @staticmethod - def __buildah_import_image_from_tar(image_tar_file, container_name): - """Import a container image using buildah form a TAR file. - - Parameters - ---------- - image_tar_file : str - Path to TAR file to import as a container image. - container_name : str - name for the working container. - - Returns - ------- - str - Name of the imported container. - - Raises - ------ - StepRunnerException - If error importing image. - """ - # import image tar file to vfs file system - try: - sh.buildah( # pylint: disable=no-member - 'from', - '--storage-driver', 'vfs', - '--name', container_name, - f"docker-archive:{image_tar_file}", - _out=sys.stdout, - _err=sys.stderr, - _tee='err' - ) - except sh.ErrorReturnCode as error: - raise StepRunnerException( - f'Error importing the image ({image_tar_file}): {error}' - ) from error - - return container_name - - @staticmethod - def __buildah_mount_container(buildah_unshare_command, container_id): - """Use buildah to mount a container. - - Parameters - ---------- - buildah_unshare_command : sh.buildah.unshare.bake() - A baked sh.buildah.unshare command to use to run this command in the context off - so that this can be done "rootless". - container_id : str - ID of the container to mount. - - Returns - ------- - str - Absolute path to the mounted container. - - Raises - ------ - StepRunnerException - If error mounting the container. - """ - mount_path = None - try: - buildah_mount_out_buff = StringIO() - buildah_mount_out_callback = create_sh_redirect_to_multiple_streams_fn_callback([ - sys.stdout, - buildah_mount_out_buff - ]) - buildah_mount_command = buildah_unshare_command.bake("buildah", "mount") - buildah_mount_command( - '--storage-driver', 'vfs', - container_id, - _out=buildah_mount_out_callback, - _err=sys.stderr, - _tee='err' - ) - mount_path = buildah_mount_out_buff.getvalue().rstrip() - except sh.ErrorReturnCode as error: - raise StepRunnerException( - f'Error mounting container ({container_id}): {error}' - ) from error - - return mount_path - @staticmethod def __get_oscap_document_type(oscap_input_file): """Gets the OpenSCAP document type for a given input file. diff --git a/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py b/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py index a584e46b..186e70b9 100644 --- a/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py +++ b/src/ploigos_step_runner/step_implementers/sign_container_image/podman_sign.py @@ -41,6 +41,7 @@ `container-image-signature-signer-private-key-fingerprint` \ | Fingerprint for the private key used to sign \ the container image. +`container-image-signed-tag` | TODO `container-image-signature-file-path` | File path to created image signature. `container-image-signature-name` | Fully qualified name of the \ name of the image signature, \ @@ -75,7 +76,9 @@ # signer-pgp-private-key - new key name # container-image-signer-pgp-private-key - old key name ['signer-pgp-private-key', 'container-image-signer-pgp-private-key'], - 'container-image-tag' + + # being flexible for different use cases of proceeding steps + ['container-image-push-tag', 'container-image-tag'] ] class PodmanSign(StepImplementer): @@ -131,12 +134,11 @@ def _run_step(self): ) # get the uri to the image to sign - container_image_tag = self.get_value('container-image-tag') + container_image_tag = self.get_value(['container-image-push-tag', 'container-image-tag']) image_signatures_directory = self.create_working_dir_sub_dir( sub_dir_relative_path='image-signature' ) - try: # import the PGP key and get the finger print signer_pgp_private_key_fingerprint = import_pgp_key( @@ -166,6 +168,10 @@ def _run_step(self): image_signatures_directory=image_signatures_directory, container_image_tag=container_image_tag ) + step_result.add_artifact( + name='container-image-signed-tag', + value=container_image_tag, + ) step_result.add_artifact( name='container-image-signature-file-path', value=signature_file_path, diff --git a/src/ploigos_step_runner/utils/containers.py b/src/ploigos_step_runner/utils/containers.py index 28003920..0aaf193d 100644 --- a/src/ploigos_step_runner/utils/containers.py +++ b/src/ploigos_step_runner/utils/containers.py @@ -2,9 +2,13 @@ """ import sys +from io import StringIO import sh from ploigos_step_runner.config.config_value import ConfigValue +from ploigos_step_runner.utils.io import \ + create_sh_redirect_to_multiple_streams_fn_callback + def container_registries_login( #pylint: disable=too-many-branches registries, @@ -275,3 +279,99 @@ def container_registry_login( #pylint: disable=too-many-arguments,too-many-branc f"Failed to login to container registry ({container_registry_uri}) " f"with username ({container_registry_username}): {error}" ) from error + +def create_container_from_image( + image_tag, + repository_type='container-storage:' +): + """Import a container image using buildah form a TAR file. + + Parameters + ---------- + image_tag : str + Image tag to create a container from. + ex: + * localhost/my-app:latest + * quay.io/my-org/my-app:latest + * docker-archive:/local/path/to/my-app-container-image.tar + container_name : str + name for the working container. + repository_type : str + The type of repository to mount the given image tag from. + See https://github.com/containers/skopeo for details on different repository types. + + Returns + ------- + str + Name of the imported container. + + Raises + ------ + RuntimeError + If error importing image. + """ + container_name = None + try: + buildah_from_out_buff = StringIO() + buildah_from_out_callback = create_sh_redirect_to_multiple_streams_fn_callback([ + sys.stdout, + buildah_from_out_buff + ]) + sh.buildah( # pylint: disable=no-member + 'from', + f"{repository_type}{image_tag}", + _out=buildah_from_out_callback, + _err=sys.stderr, + _tee='err' + ) + container_name = buildah_from_out_buff.getvalue().rstrip() + except sh.ErrorReturnCode as error: + raise RuntimeError( + f'Error creating container from image ({image_tag}): {error}' + ) from error + + return container_name + + +def mount_container(buildah_unshare_command, container_id): + """Use buildah to mount a container. + + Parameters + ---------- + buildah_unshare_command : sh.buildah.unshare.bake() + A baked sh.buildah.unshare command to use to run this command in the context off + so that this can be done "rootless". + container_id : str + ID of the container to mount. + + Returns + ------- + str + Absolute path to the mounted container. + + Raises + ------ + RuntimeError + If error mounting the container. + """ + mount_path = None + try: + buildah_mount_out_buff = StringIO() + buildah_mount_out_callback = create_sh_redirect_to_multiple_streams_fn_callback([ + sys.stdout, + buildah_mount_out_buff + ]) + buildah_mount_command = buildah_unshare_command.bake("buildah", "mount") + buildah_mount_command( + container_id, + _out=buildah_mount_out_callback, + _err=sys.stderr, + _tee='err' + ) + mount_path = buildah_mount_out_buff.getvalue().rstrip() + except sh.ErrorReturnCode as error: + raise RuntimeError( + f'Error mounting container ({container_id}): {error}' + ) from error + + return mount_path diff --git a/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py b/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py index 8ebc501c..2fbab135 100644 --- a/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py +++ b/tests/step_implementers/container_image_static_compliance_scan/test_openscap_compliance.py @@ -1,14 +1,15 @@ import os import re -from testfixtures import TempDirectory -from tests.step_implementers.shared.test_openscap_generic import \ - TestStepImplementerSharedOpenSCAPGeneric +import tests.step_implementers.shared.test_openscap_generic from ploigos_step_runner.step_implementers.container_image_static_compliance_scan import \ OpenSCAP +from testfixtures import TempDirectory +from tests.helpers.base_step_implementer_test_case import \ + BaseStepImplementerTestCase -class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP(TestStepImplementerSharedOpenSCAPGeneric): +class BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP(BaseStepImplementerTestCase): def create_step_implementer( self, step_config={}, @@ -26,123 +27,52 @@ def create_step_implementer( parent_work_dir_path=parent_work_dir_path ) - def test__validate_required_config_or_previous_step_result_artifact_keys_valid(self): - step_config = { - 'oscap-input-definitions-uri': 'https://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml', - 'oscap-profile': 'foo', - 'image-tar-file': 'does-not-matter' - } - - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - step_implementer = self.create_step_implementer( - step_config=step_config, - step_name='test', - implementer='OpenSCAP', - parent_work_dir_path=parent_work_dir_path - ) - - step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - - def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_extension(self): - oscap_input_definitions_uri = 'https://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml.foo' - step_config = { - 'oscap-input-definitions-uri': oscap_input_definitions_uri, - 'oscap-profile': 'foo', - 'image-tar-file': 'does-not-matter' - } - - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - step_implementer = self.create_step_implementer( - step_config=step_config, - step_name='test', - implementer='OpenSCAP', - parent_work_dir_path=parent_work_dir_path - ) - - with self.assertRaisesRegex( - AssertionError, - re.compile( - r'Open SCAP input definitions source ' - rf'\({oscap_input_definitions_uri}\) must be of known type \(xml\|bz2\), got: \.foo' - ) - ): - step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - - def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_protocal(self): - oscap_input_definitions_uri = 'foo://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml' - step_config = { - 'oscap-input-definitions-uri': oscap_input_definitions_uri, - 'oscap-profile': 'foo', - 'image-tar-file': 'does-not-matter' - } - - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - step_implementer = self.create_step_implementer( - step_config=step_config, - step_name='test', - implementer='OpenSCAP', - parent_work_dir_path=parent_work_dir_path - ) - - with self.assertRaisesRegex( - AssertionError, - re.compile( - r'Open SCAP input definitions source ' - rf'\({oscap_input_definitions_uri}\) must start with known protocol ' - r'\(file://\|http://\|https://\)\.' - ) - ): - step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - - def test__validate_required_config_or_previous_step_result_artifact_keys_missing_oscap_profile(self): - oscap_input_definitions_uri = 'https://atopathways.redhatgov.io/compliance-as-code/scap/ssg-rhel8-ds.xml' - step_config = { - 'oscap-input-definitions-uri': oscap_input_definitions_uri, - 'image-tar-file': 'does-not-matter' - } - - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - step_implementer = self.create_step_implementer( - step_config=step_config, - step_name='test', - implementer='OpenSCAP', - parent_work_dir_path=parent_work_dir_path - ) - - with self.assertRaisesRegex( - AssertionError, - re.compile( - r"Missing required step configuration or previous step result" - r" artifact keys: \['oscap-profile'\]" - ) - ): - step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - - def test__validate_required_config_or_previous_step_result_artifact_keys_missing_required_keys(self): - step_config = {} - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - step_implementer = self.create_step_implementer( - step_config=step_config, - step_name='test', - implementer='OpenSCAP', - parent_work_dir_path=parent_work_dir_path - ) - - with self.assertRaisesRegex( - AssertionError, - re.compile( - r"Missing required step configuration or previous step result" - r" artifact keys: \['oscap-profile', 'oscap-input-definitions-uri', 'image-tar-file'\]" - ) - ): - step_implementer._validate_required_config_or_previous_step_result_artifact_keys() +class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP_step_implementer_config_defaults( + BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric_step_implementer_config_defaults +): + pass + +class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP__required_config_or_result_keys( + BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP +): + def test_result(self): + required_keys = OpenSCAP._required_config_or_result_keys() + expected_required_keys = [ + 'oscap-profile', + 'oscap-input-definitions-uri', + 'container-image-tag', + 'container-image-pull-repository-type', + ['container-image-pull-repository-type', 'container-image-repository-type'] + ] + self.assertEqual(required_keys, expected_required_keys) + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__validate_required_config_or_previous_step_result_artifact_keys( + BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__validate_required_config_or_previous_step_result_artifact_keys +): + pass + +class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP___get_oscap_document_type( + BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_document_type +): + pass + +class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP___get_oscap_eval_type_based_on_document_type( + BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_eval_type_based_on_document_type +): + pass + +class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP___run_oscap_scan( + BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___run_oscap_scan +): + pass + +class TestStepImplementerContainerImageStaticComplianceScanOpenSCAP__run_step( + BaseTestStepImplementerContainerImageStaticComplianceScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__run_step +): + pass diff --git a/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py b/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py index 0efeae61..31da9cec 100644 --- a/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py +++ b/tests/step_implementers/container_image_static_vulnerability_scan/test_openscap_vulnerability.py @@ -1,10 +1,11 @@ -from tests.step_implementers.shared.test_openscap_generic import \ - TestStepImplementerSharedOpenSCAPGeneric +import tests.step_implementers.shared.test_openscap_generic from ploigos_step_runner.step_implementers.container_image_static_vulnerability_scan import \ OpenSCAP +from tests.helpers.base_step_implementer_test_case import \ + BaseStepImplementerTestCase -class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP(TestStepImplementerSharedOpenSCAPGeneric): +class BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP(BaseStepImplementerTestCase): def create_step_implementer( self, step_config={}, @@ -21,3 +22,45 @@ def create_step_implementer( workflow_result=workflow_result, parent_work_dir_path=parent_work_dir_path ) + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP_step_implementer_config_defaults( + BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric_step_implementer_config_defaults +): + pass + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__required_config_or_result_keys( + BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__required_config_or_result_keys +): + pass + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__validate_required_config_or_previous_step_result_artifact_keys( + BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__validate_required_config_or_previous_step_result_artifact_keys +): + pass + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP___get_oscap_document_type( + BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_document_type +): + pass + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP___get_oscap_eval_type_based_on_document_type( + BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___get_oscap_eval_type_based_on_document_type +): + pass + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP___run_oscap_scan( + BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric___run_oscap_scan +): + pass + +class TestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP__run_step( + BaseTestStepImplementerContainerImageStaticVulnerabilityScanOpenSCAP, + tests.step_implementers.shared.test_openscap_generic.TestStepImplementerSharedOpenSCAPGeneric__run_step +): + pass diff --git a/tests/step_implementers/create_container_image/test_buildah_create_container_image.py b/tests/step_implementers/create_container_image/test_buildah_create_container_image.py index 9d028ef0..61caff3a 100644 --- a/tests/step_implementers/create_container_image/test_buildah_create_container_image.py +++ b/tests/step_implementers/create_container_image/test_buildah_create_container_image.py @@ -13,7 +13,7 @@ BaseStepImplementerTestCase -class TestStepImplementerCreateContainerImageBuildah(BaseStepImplementerTestCase): +class BaseTestStepImplementerCreateContainerImageBuildah(BaseStepImplementerTestCase): def create_step_implementer( self, step_config={}, @@ -31,11 +31,114 @@ def create_step_implementer( parent_work_dir_path=parent_work_dir_path ) -# TESTS FOR configuration checks - def test_step_implementer_config_defaults(self): +@patch("ploigos_step_runner.StepImplementer._validate_required_config_or_previous_step_result_artifact_keys") +class TestStepImplementerCreateContainerImageBuildah__validate_required_config_or_previous_step_result_artifact_keys( + BaseTestStepImplementerCreateContainerImageBuildah +): + def test_valid_defaults(self, mock_super_validate): + with TempDirectory() as test_dir: + # setup + parent_work_dir_path = os.path.join(test_dir.path, 'working') + step_config = { + 'context': test_dir.path, + 'organization': 'org-name', + 'service-name': 'service-name', + 'application-name': 'app-name' + } + test_dir.write('Containerfile',b'''testing''') + + # run test + step_implementer = self.create_step_implementer( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path, + ) + step_implementer._validate_required_config_or_previous_step_result_artifact_keys() + + # validate + mock_super_validate.assert_called_once_with() + + def test_valid_custom_imagespecfile(self, mock_super_validate): + with TempDirectory() as test_dir: + # setup + parent_work_dir_path = os.path.join(test_dir.path, 'working') + step_config = { + 'context': test_dir.path, + 'organization': 'org-name', + 'service-name': 'service-name', + 'application-name': 'app-name', + 'imagespecfile': 'MockContainerfile.ubi8' + } + test_dir.write('MockContainerfile.ubi8',b'''testing''') + + # run test + step_implementer = self.create_step_implementer( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path, + ) + step_implementer._validate_required_config_or_previous_step_result_artifact_keys() + + # validate + mock_super_validate.assert_called_once_with() + + def test_fail_missing_imagespecfile_defaults(self, mock_super_validate): + with TempDirectory() as test_dir: + # setup + parent_work_dir_path = os.path.join(test_dir.path, 'working') + step_config = { + 'context': test_dir.path, + 'organization': 'org-name', + 'service-name': 'service-name', + 'application-name': 'app-name' + } + + # run test + step_implementer = self.create_step_implementer( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path, + ) + with self.assertRaisesRegex( + AssertionError, + rf'Given imagespecfile \(Containerfile\) does not exist' + rf' in given context \({test_dir.path}\).' + ): + step_implementer._validate_required_config_or_previous_step_result_artifact_keys() + + # validate + mock_super_validate.assert_called_once_with() + + def test_fail_missing_imagespecfile_custom_imagespecfile(self, mock_super_validate): + with TempDirectory() as test_dir: + # setup + parent_work_dir_path = os.path.join(test_dir.path, 'working') + step_config = { + 'context': test_dir.path, + 'organization': 'org-name', + 'service-name': 'service-name', + 'application-name': 'app-name', + 'imagespecfile': 'MockContainerfile.ubi8' + } + + # run test + step_implementer = self.create_step_implementer( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path, + ) + with self.assertRaisesRegex( + AssertionError, + rf'Given imagespecfile \(MockContainerfile.ubi8\) does not exist' + rf' in given context \({test_dir.path}\).' + ): + step_implementer._validate_required_config_or_previous_step_result_artifact_keys() + + # validate + mock_super_validate.assert_called_once_with() + +class TestStepImplementerCreateContainerImageBuildah_step_implementer_config_defaults( + BaseTestStepImplementerCreateContainerImageBuildah +): + def test_result(self): defaults = Buildah.step_implementer_config_defaults() expected_defaults = { - 'containers-config-auth-file': os.path.join(Path.home(), '.buildah-auth.json'), 'imagespecfile': 'Containerfile', 'context': '.', 'tls-verify': True, @@ -43,22 +146,29 @@ def test_step_implementer_config_defaults(self): } self.assertEqual(defaults, expected_defaults) - def test__required_config_or_result_keys(self): +class TestStepImplementerCreateContainerImageBuildah___required_config_or_result_keys( + BaseTestStepImplementerCreateContainerImageBuildah +): + def test_result(self): required_keys = Buildah._required_config_or_result_keys() expected_required_keys = [ - 'containers-config-auth-file', 'imagespecfile', 'context', 'tls-verify', 'format', + 'organization', 'service-name', 'application-name' ] self.assertEqual(required_keys, expected_required_keys) +class TestStepImplementerCreateContainerImageBuildah___run_step( + BaseTestStepImplementerCreateContainerImageBuildah +): @patch('sh.buildah', create=True) - def test__run_step_pass(self, buildah_mock): + def test_pass(self, buildah_mock): with TempDirectory() as temp_dir: + # setup test parent_work_dir_path = os.path.join(temp_dir.path, 'working') temp_dir.write('Containerfile',b'''testing''') @@ -68,11 +178,11 @@ def test__run_step_pass(self, buildah_mock): workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) step_config = { - 'containers-config-auth-file': 'buildah-auth.json', 'imagespecfile': 'Containerfile', 'context': temp_dir.path, 'tls-verify': True, 'format': 'oci', + 'organization': 'org-name', 'service-name': 'service-name', 'application-name': 'app-name' } @@ -84,109 +194,168 @@ def test__run_step_pass(self, buildah_mock): parent_work_dir_path=parent_work_dir_path ) - - + # run test result = step_implementer._run_step() + # verify results expected_step_result = StepResult( step_name='create-container-image', sub_step_name='Buildah', sub_step_implementer_name='Buildah' ) + expected_step_result.add_artifact( + name='container-image-registry-uri', + value='localhost', + description='Registry URI poriton of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-registry-organization', + value='org-name', + description='Organization portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-repository', + value='app-name-service-name', + description='Repository portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-name', + value='app-name-service-name', + description='Another way to reference the' \ + ' repository portion of the container image tag of the built container image.' + ) expected_step_result.add_artifact( name='container-image-version', - value='localhost/app-name/service-name:1.0-123abc' + value='1.0-123abc', + description='Version portion of the container image tag of the built container image.' ) expected_step_result.add_artifact( - name='image-tar-file', - value=f'{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar' + name='container-image-tag', + value='localhost/org-name/app-name-service-name:1.0-123abc', + description='Full container image tag of the built container,' \ + ' including the registry URI.' ) - + expected_step_result.add_artifact( + name='container-image-short-tag', + value='org-name/app-name-service-name:1.0-123abc', + description='Short container image tag of the built container image,' \ + ' excluding the registry URI.' + ) + self.assertEqual(result, expected_step_result) buildah_mock.bud.assert_called_once_with( '--format=oci', '--tls-verify=true', '--layers', '-f', 'Containerfile', - '-t', 'localhost/app-name/service-name:1.0-123abc', - '--authfile', 'buildah-auth.json', + '-t', 'localhost/org-name/app-name-service-name:1.0-123abc', + '--authfile', os.path.join(step_implementer.work_dir_path, 'container-auth.json'), temp_dir.path, _out=sys.stdout, _err=sys.stderr, _tee='err' ) - buildah_mock.push.assert_called_once_with( - 'localhost/app-name/service-name:1.0-123abc', - f'docker-archive:{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar', - _out=sys.stdout, - _err=sys.stderr, - _tee='err' - ) - self.assertEqual(result, expected_step_result) - @patch('sh.buildah', create=True) - def test__run_step_pass_no_container_image_version(self, buildah_mock): + def test_pass_custom_auth_file(self, buildah_mock): with TempDirectory() as temp_dir: + # setup test parent_work_dir_path = os.path.join(temp_dir.path, 'working') temp_dir.write('Containerfile',b'''testing''') + artifact_config = { + 'container-image-version': {'description': '', 'value': '1.0-123abc'}, + } + workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) + step_config = { - 'containers-config-auth-file': 'buildah-auth.json', 'imagespecfile': 'Containerfile', 'context': temp_dir.path, 'tls-verify': True, 'format': 'oci', + 'organization': 'org-name', 'service-name': 'service-name', - 'application-name': 'app-name' + 'application-name': 'app-name', + 'containers-config-auth-file': 'mock-auth.json' } - step_implementer = self.create_step_implementer( step_config=step_config, step_name='create-container-image', implementer='Buildah', - parent_work_dir_path=parent_work_dir_path, + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path ) + # run test result = step_implementer._run_step() + # verify results expected_step_result = StepResult( step_name='create-container-image', sub_step_name='Buildah', sub_step_implementer_name='Buildah' ) + expected_step_result.add_artifact( + name='container-image-registry-uri', + value='localhost', + description='Registry URI poriton of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-registry-organization', + value='org-name', + description='Organization portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-repository', + value='app-name-service-name', + description='Repository portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-name', + value='app-name-service-name', + description='Another way to reference the' \ + ' repository portion of the container image tag of the built container image.' + ) expected_step_result.add_artifact( name='container-image-version', - value='localhost/app-name/service-name:latest' + value='1.0-123abc', + description='Version portion of the container image tag of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-tag', + value='localhost/org-name/app-name-service-name:1.0-123abc', + description='Full container image tag of the built container,' \ + ' including the registry URI.' ) expected_step_result.add_artifact( - name='image-tar-file', - value=f'{step_implementer.work_dir_path}/image-app-name-service-name-latest.tar' + name='container-image-short-tag', + value='org-name/app-name-service-name:1.0-123abc', + description='Short container image tag of the built container image,' \ + ' excluding the registry URI.' ) + self.assertEqual(result, expected_step_result) buildah_mock.bud.assert_called_once_with( '--format=oci', '--tls-verify=true', '--layers', '-f', 'Containerfile', - '-t', 'localhost/app-name/service-name:latest', - '--authfile', 'buildah-auth.json', + '-t', 'localhost/org-name/app-name-service-name:1.0-123abc', + '--authfile', 'mock-auth.json', temp_dir.path, _out=sys.stdout, _err=sys.stderr, _tee='err' ) - buildah_mock.push.assert_called_once_with( - 'localhost/app-name/service-name:latest', - f'docker-archive:{step_implementer.work_dir_path}/image-app-name-service-name-latest.tar', - _out=sys.stdout, - _err=sys.stderr, - _tee='err' - ) - self.assertEqual(result, expected_step_result) - @patch('sh.buildah', create=True) - def test__run_step_pass_image_tar_file_exists(self, buildah_mock): + def test_pass_string_tls_verify(self, buildah_mock): with TempDirectory() as temp_dir: + # setup test parent_work_dir_path = os.path.join(temp_dir.path, 'working') temp_dir.write('Containerfile',b'''testing''') @@ -196,11 +365,11 @@ def test__run_step_pass_image_tar_file_exists(self, buildah_mock): workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) step_config = { - 'containers-config-auth-file': 'buildah-auth.json', 'imagespecfile': 'Containerfile', 'context': temp_dir.path, - 'tls-verify': True, + 'tls-verify': 'true', 'format': 'oci', + 'organization': 'org-name', 'service-name': 'service-name', 'application-name': 'app-name' } @@ -212,57 +381,82 @@ def test__run_step_pass_image_tar_file_exists(self, buildah_mock): parent_work_dir_path=parent_work_dir_path ) - step_implementer.write_working_file('image-app-name-service-name-1.0-123abc.tar') - + # run test result = step_implementer._run_step() + # verify results expected_step_result = StepResult( step_name='create-container-image', sub_step_name='Buildah', sub_step_implementer_name='Buildah' ) + expected_step_result.add_artifact( + name='container-image-registry-uri', + value='localhost', + description='Registry URI poriton of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-registry-organization', + value='org-name', + description='Organization portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-repository', + value='app-name-service-name', + description='Repository portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-name', + value='app-name-service-name', + description='Another way to reference the' \ + ' repository portion of the container image tag of the built container image.' + ) expected_step_result.add_artifact( name='container-image-version', - value='localhost/app-name/service-name:1.0-123abc' + value='1.0-123abc', + description='Version portion of the container image tag of the built container image.' ) expected_step_result.add_artifact( - name='image-tar-file', - value=f'{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar' + name='container-image-tag', + value='localhost/org-name/app-name-service-name:1.0-123abc', + description='Full container image tag of the built container,' \ + ' including the registry URI.' ) - + expected_step_result.add_artifact( + name='container-image-short-tag', + value='org-name/app-name-service-name:1.0-123abc', + description='Short container image tag of the built container image,' \ + ' excluding the registry URI.' + ) + self.assertEqual(result, expected_step_result) buildah_mock.bud.assert_called_once_with( '--format=oci', '--tls-verify=true', '--layers', '-f', 'Containerfile', - '-t', 'localhost/app-name/service-name:1.0-123abc', - '--authfile', 'buildah-auth.json', + '-t', 'localhost/org-name/app-name-service-name:1.0-123abc', + '--authfile', os.path.join(step_implementer.work_dir_path, 'container-auth.json'), temp_dir.path, _out=sys.stdout, _err=sys.stderr, _tee='err' ) - buildah_mock.push.assert_called_once_with( - 'localhost/app-name/service-name:1.0-123abc', - f'docker-archive:{step_implementer.work_dir_path}/image-app-name-service-name-1.0-123abc.tar', - _out=sys.stdout, - _err=sys.stderr, - _tee='err' - ) - self.assertEqual(result, expected_step_result) - @patch('sh.buildah', create=True) - def test__run_step_fail_no_image_spec_file(self, buildah_mock): + def test_pass_no_container_image_version(self, buildah_mock): with TempDirectory() as temp_dir: + # setup test parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - artifact_config = { - 'container-image-version': {'description': '', 'value': '1.0-123abc'}, - } - workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) - + temp_dir.write('Containerfile',b'''testing''') step_config = { + 'imagespecfile': 'Containerfile', + 'context': temp_dir.path, + 'tls-verify': True, + 'format': 'oci', + 'organization': 'org-name', 'service-name': 'service-name', 'application-name': 'app-name' } @@ -270,24 +464,75 @@ def test__run_step_fail_no_image_spec_file(self, buildah_mock): step_config=step_config, step_name='create-container-image', implementer='Buildah', - workflow_result=workflow_result, - parent_work_dir_path=parent_work_dir_path + parent_work_dir_path=parent_work_dir_path, ) + # run test result = step_implementer._run_step() + # verify results expected_step_result = StepResult( step_name='create-container-image', sub_step_name='Buildah', sub_step_implementer_name='Buildah' ) - expected_step_result.success = False - expected_step_result.message = 'Image specification file does not exist in location: ./Containerfile' - + expected_step_result.add_artifact( + name='container-image-registry-uri', + value='localhost', + description='Registry URI poriton of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-registry-organization', + value='org-name', + description='Organization portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-repository', + value='app-name-service-name', + description='Repository portion of the container image tag' \ + ' of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-name', + value='app-name-service-name', + description='Another way to reference the' \ + ' repository portion of the container image tag of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-version', + value='latest', + description='Version portion of the container image tag of the built container image.' + ) + expected_step_result.add_artifact( + name='container-image-tag', + value='localhost/org-name/app-name-service-name:latest', + description='Full container image tag of the built container,' \ + ' including the registry URI.' + ) + expected_step_result.add_artifact( + name='container-image-short-tag', + value='org-name/app-name-service-name:latest', + description='Short container image tag of the built container image,' \ + ' excluding the registry URI.' + ) self.assertEqual(result, expected_step_result) + buildah_mock.bud.assert_called_once_with( + '--format=oci', + '--tls-verify=true', + '--layers', '-f', 'Containerfile', + '-t', 'localhost/org-name/app-name-service-name:latest', + '--authfile', os.path.join(step_implementer.work_dir_path, 'container-auth.json'), + temp_dir.path, + _out=sys.stdout, + _err=sys.stderr, + _tee='err' + ) + @patch('sh.buildah', create=True) - def test__run_step_fail_buildah_bud_error(self, buildah_mock): + def test_fail_buildah_bud_error(self, buildah_mock): with TempDirectory() as temp_dir: parent_work_dir_path = os.path.join(temp_dir.path, 'working') temp_dir.write('Containerfile',b'''testing''') @@ -299,7 +544,6 @@ def test__run_step_fail_buildah_bud_error(self, buildah_mock): image_spec_file = 'Containerfile' step_config = { - 'containers-config-auth-file': 'buildah-auth.json', 'imagespecfile': image_spec_file, 'context': temp_dir.path, 'tls-verify': True, @@ -333,57 +577,3 @@ def test__run_step_fail_buildah_bud_error(self, buildah_mock): re.DOTALL ) ) - - @patch('sh.buildah', create=True) - def test__run_step_fail_buildah_push_error(self, buildah_mock): - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - temp_dir.write('Containerfile',b'''testing''') - - application_name = 'app-name' - service_name = 'service-name' - image_tag_version = '1.0-123abc' - - artifact_config = { - 'container-image-version': {'description': '', 'value': image_tag_version}, - } - workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) - - step_config = { - 'containers-config-auth-file': 'buildah-auth.json', - 'imagespecfile': 'Containerfile', - 'context': temp_dir.path, - 'tl-sverify': 'true', - 'format': 'oci', - 'service-name': service_name, - 'application-name': application_name - } - step_implementer = self.create_step_implementer( - step_config=step_config, - step_name='create-container-image', - implementer='Buildah', - workflow_result=workflow_result, - parent_work_dir_path=parent_work_dir_path - ) - - buildah_mock.push.side_effect = sh.ErrorReturnCode('buildah', b'mock out', b'mock error') - - result = step_implementer._run_step() - - image_tar_path = os.path.join( - step_implementer.work_dir_path, - f'image-{application_name}-{service_name}-{image_tag_version}.tar' - ) - self.assertFalse(result.success) - self.assertRegex( - result.message, - re.compile( - rf'Issue invoking buildah push to tar file \({image_tar_path}\):' - r'.*RAN: buildah' - r'.*STDOUT:' - r'.*mock out' - r'.*STDERR:' - r'.*mock error', - re.DOTALL - ) - ) \ No newline at end of file diff --git a/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py b/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py index d497e07f..169ddf6d 100644 --- a/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py +++ b/tests/step_implementers/push_container_image/test_skopeo_push_container_image.py @@ -1,22 +1,17 @@ -# pylint: disable=missing-module-docstring -# pylint: disable=missing-class-docstring -# pylint: disable=missing-function-docstring import os -import re from io import IOBase -from pathlib import Path from unittest.mock import patch import sh +from ploigos_step_runner import StepResult +from ploigos_step_runner.step_implementers.push_container_image import Skopeo from testfixtures import TempDirectory from tests.helpers.base_step_implementer_test_case import \ BaseStepImplementerTestCase from tests.helpers.test_utils import Any -from ploigos_step_runner.step_implementers.push_container_image import Skopeo -from ploigos_step_runner import StepResult -class TestStepImplementerSkopeoSourceBase(BaseStepImplementerTestCase): +class BaseTestStepImplementerSkopeoSourceBase(BaseStepImplementerTestCase): def create_step_implementer( self, step_config={}, @@ -34,122 +29,320 @@ def create_step_implementer( parent_work_dir_path=parent_work_dir_path ) - def test_step_implementer_config_defaults(self): +class TestStepImplementerSkopeoSourceBase_step_implementer_config_defaults( + BaseTestStepImplementerSkopeoSourceBase +): + def test_result(self): defaults = Skopeo.step_implementer_config_defaults() expected_defaults = { - 'containers-config-auth-file': os.path.join( - Path.home(), - '.container-image-repo-auth.json' - ), - 'dest-tls-verify': 'true', - 'src-tls-verify': 'true' + 'src-tls-verify': True, + 'dest-tls-verify': True, + 'container-image-pull-repository-type': 'containers-storage:', + 'container-image-push-repository-type': 'docker://' } self.assertEqual(defaults, expected_defaults) - def test__required_config_or_result_keys(self): +class TestStepImplementerSkopeoSourceBase___required_config_or_result_keys( + BaseTestStepImplementerSkopeoSourceBase +): + def test_result(self): required_keys = Skopeo._required_config_or_result_keys() expected_required_keys = [ - 'containers-config-auth-file', 'destination-url', - 'src-tls-verify', + ['source-tls,verify', 'src-tls-verify'], 'dest-tls-verify', 'service-name', 'application-name', 'organization', - 'container-image-version', - 'image-tar-file' + ['container-image-pull-tag', 'container-image-tag'], + ['container-image-pull-repository-type', 'container-image-repository-type'], + ['container-image-push-repository-type', 'container-image-repository-type'] ] self.assertEqual(required_keys, expected_required_keys) +class TestStepImplementerSkopeoSourceBase__run_step( + BaseTestStepImplementerSkopeoSourceBase +): + def __run_pass_test( + self, + step_config, + image_version, + image_pull_tag, + image_push_tag, + temp_dir, + skopeo_mock, + skopeo_mock_call_dest_tls_value='true', + skopeo_mock_call_src_tls_value='true', + expected_step_result_message=None, + containers_config_auth_file=None + ): + parent_work_dir_path = os.path.join(temp_dir.path, 'working') + + # setup step + step_implementer = self.create_step_implementer( + step_config=step_config, + step_name='push-container-image', + implementer='Skopeo', + parent_work_dir_path=parent_work_dir_path, + ) + + # run step + result = step_implementer._run_step() + + # verify + expected_step_result = StepResult( + step_name='push-container-image', + sub_step_name='Skopeo', + sub_step_implementer_name='Skopeo' + ) + if expected_step_result_message: + expected_step_result.message = expected_step_result_message + expected_step_result.add_artifact( + name='container-image-registry-uri', + value='fake-registry.xyz', + description='Registry URI poriton of the container image tag' \ + ' of the pushed container image.' + ) + expected_step_result.add_artifact( + name='container-image-registry-organization', + value='fake-org', + description='Organization portion of the container image tag' \ + ' of the pushed container image.' + ) + expected_step_result.add_artifact( + name='container-image-repository', + value='fake-app-fake-service', + description='Repository portion of the container image tag' \ + ' of the pushed container image.' + ) + expected_step_result.add_artifact( + name='container-image-name', + value='fake-app-fake-service', + description='Another way to reference the' \ + ' repository portion of the container image tag of the pushed container image.' + ) + expected_step_result.add_artifact( + name='container-image-version', + value=image_version, + description='Version portion of the container image tag of the pushed container image.' + ) + expected_step_result.add_artifact( + name='container-image-tag', + value=f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}', + description='Full container image tag of the pushed container,' \ + ' including the registry URI.' + ) + expected_step_result.add_artifact( + name='container-image-short-tag', + value=f'fake-org/fake-app-fake-service:{image_version}', + description='Short container image tag of the pushed container image,' \ + ' excluding the registry URI.' + ) + self.assertEqual(result, expected_step_result) + + if not containers_config_auth_file: + containers_config_auth_file = os.path.join( + step_implementer.work_dir_path, + 'container-auth.json' + ) + + skopeo_mock.copy.assert_called_once_with( + f"--src-tls-verify={skopeo_mock_call_src_tls_value}", + f"--dest-tls-verify={skopeo_mock_call_dest_tls_value}", + f"--authfile={containers_config_auth_file}", + f'containers-storage:{image_pull_tag}', + f'docker://{image_push_tag}', + _out=Any(IOBase), + _err=Any(IOBase), + _tee='err' + ) + @patch.object(sh, 'skopeo', create=True) - def test_run_step_pass(self, skopeo_mock): + def test_pass(self, skopeo_mock): with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - image_tar_file = 'fake-image.tar' image_version = '1.0-69442c8' - image_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' step_config = { 'destination-url': 'fake-registry.xyz', 'service-name': 'fake-service', 'application-name': 'fake-app', 'organization': 'fake-org', 'container-image-version': image_version, - 'image-tar-file': image_tar_file + 'container-image-pull-tag': image_pull_tag } - step_implementer = self.create_step_implementer( + self.__run_pass_test( + temp_dir=temp_dir, + image_version=image_version, + image_pull_tag=image_pull_tag, + image_push_tag=image_push_tag, step_config=step_config, - step_name='push-container-image', - implementer='Skopeo', - parent_work_dir_path=parent_work_dir_path, + skopeo_mock=skopeo_mock ) - result = step_implementer._run_step() - - expected_step_result = StepResult( - step_name='push-container-image', - sub_step_name='Skopeo', - sub_step_implementer_name='Skopeo' - ) - expected_step_result.add_artifact( - name='container-image-registry-uri', - value='fake-registry.xyz' - ) - expected_step_result.add_artifact( - name='container-image-registry-organization', - value='fake-org' - ) - expected_step_result.add_artifact( - name='container-image-repository', - value='fake-app-fake-service' - ) - expected_step_result.add_artifact( - name='container-image-name', - value='fake-app-fake-service' + @patch.object(sh, 'skopeo', create=True) + def test_pass_default_version(self, skopeo_mock): + with TempDirectory() as temp_dir: + image_version = 'latest' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + step_config = { + 'destination-url': 'fake-registry.xyz', + 'service-name': 'fake-service', + 'application-name': 'fake-app', + 'organization': 'fake-org', + 'container-image-pull-tag': image_pull_tag + } + self.__run_pass_test( + temp_dir=temp_dir, + image_version=image_version, + image_pull_tag=image_pull_tag, + image_push_tag=image_push_tag, + step_config=step_config, + skopeo_mock=skopeo_mock ) - expected_step_result.add_artifact( - name='container-image-version', - value=image_version + + + @patch.object(sh, 'skopeo', create=True) + def test_pass_string_destination_tls_truethy(self, skopeo_mock): + with TempDirectory() as temp_dir: + image_version = '1.0-69442c8' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + step_config = { + 'destination-url': 'fake-registry.xyz', + 'service-name': 'fake-service', + 'application-name': 'fake-app', + 'organization': 'fake-org', + 'container-image-version': image_version, + 'container-image-pull-tag': image_pull_tag, + 'dest-tls-verify': 'true' + } + self.__run_pass_test( + temp_dir=temp_dir, + image_version=image_version, + image_pull_tag=image_pull_tag, + image_push_tag=image_push_tag, + step_config=step_config, + skopeo_mock=skopeo_mock ) - expected_step_result.add_artifact( - name='container-image-tag', - value='fake-registry.xyz/fake-org/fake-app-fake-service:1.0-69442c8' + + @patch.object(sh, 'skopeo', create=True) + def test_pass_string_destination_tls_falsy(self, skopeo_mock): + with TempDirectory() as temp_dir: + image_version = '1.0-69442c8' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + step_config = { + 'destination-url': 'fake-registry.xyz', + 'service-name': 'fake-service', + 'application-name': 'fake-app', + 'organization': 'fake-org', + 'container-image-version': image_version, + 'container-image-pull-tag': image_pull_tag, + 'dest-tls-verify': 'false' + } + self.__run_pass_test( + temp_dir=temp_dir, + image_version=image_version, + image_pull_tag=image_pull_tag, + image_push_tag=image_push_tag, + step_config=step_config, + skopeo_mock=skopeo_mock, + skopeo_mock_call_dest_tls_value='false' ) - self.assertEqual( - result.get_step_result_dict(), - expected_step_result.get_step_result_dict() + + @patch.object(sh, 'skopeo', create=True) + def test_pass_string_source_tls_truthy(self, skopeo_mock): + with TempDirectory() as temp_dir: + image_version = '1.0-69442c8' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + step_config = { + 'destination-url': 'fake-registry.xyz', + 'service-name': 'fake-service', + 'application-name': 'fake-app', + 'organization': 'fake-org', + 'container-image-version': image_version, + 'container-image-pull-tag': image_pull_tag, + 'src-tls-verify': 'true' + } + self.__run_pass_test( + temp_dir=temp_dir, + image_version=image_version, + image_pull_tag=image_pull_tag, + image_push_tag=image_push_tag, + step_config=step_config, + skopeo_mock=skopeo_mock ) - containers_config_auth_file = os.path.join( - Path.home(), - '.container-image-repo-auth.json' + @patch.object(sh, 'skopeo', create=True) + def test_pass_string_source_tls_falsy(self, skopeo_mock): + with TempDirectory() as temp_dir: + image_version = '1.0-69442c8' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + step_config = { + 'destination-url': 'fake-registry.xyz', + 'service-name': 'fake-service', + 'application-name': 'fake-app', + 'organization': 'fake-org', + 'container-image-version': image_version, + 'container-image-pull-tag': image_pull_tag, + 'src-tls-verify': 'false' + } + self.__run_pass_test( + temp_dir=temp_dir, + image_version=image_version, + image_pull_tag=image_pull_tag, + image_push_tag=image_push_tag, + step_config=step_config, + skopeo_mock=skopeo_mock, + skopeo_mock_call_src_tls_value='false' ) - skopeo_mock.copy.assert_called_once_with( - "--src-tls-verify=true", - "--dest-tls-verify=true", - f"--authfile={containers_config_auth_file}", - f'docker-archive:{image_tar_file}', - f'docker://{image_tag}', - _out=Any(IOBase), - _err=Any(IOBase), - _tee='err' + + @patch.object(sh, 'skopeo', create=True) + def test_pass_custom_auth_file(self, skopeo_mock): + with TempDirectory() as temp_dir: + image_version = '1.0-69442c8' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + step_config = { + 'destination-url': 'fake-registry.xyz', + 'service-name': 'fake-service', + 'application-name': 'fake-app', + 'organization': 'fake-org', + 'container-image-version': image_version, + 'container-image-pull-tag': image_pull_tag, + 'containers-config-auth-file': 'mock-auth.json' + } + self.__run_pass_test( + temp_dir=temp_dir, + image_version=image_version, + image_pull_tag=image_pull_tag, + image_push_tag=image_push_tag, + step_config=step_config, + skopeo_mock=skopeo_mock, + containers_config_auth_file='mock-auth.json' ) @patch.object(sh, 'skopeo', create=True) - def test_run_step_fail_run_skopeo(self, skopeo_mock): + def test_fail_run_skopeo(self, skopeo_mock): with TempDirectory() as temp_dir: parent_work_dir_path = os.path.join(temp_dir.path, 'working') - image_tar_file = 'fake-image.tar' + # setup step image_version = '1.0-69442c8' - image_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' + image_pull_tag = f'localhost/fake-org/fake-app-fake-service:{image_version}' + image_push_tag = f'fake-registry.xyz/fake-org/fake-app-fake-service:{image_version}' step_config = { 'destination-url': 'fake-registry.xyz', 'service-name': 'fake-service', 'application-name': 'fake-app', 'organization': 'fake-org', 'container-image-version': image_version, - 'image-tar-file': image_tar_file + 'container-image-version': image_version, + 'container-image-pull-tag': image_pull_tag } step_implementer = self.create_step_implementer( step_config=step_config, @@ -158,9 +351,11 @@ def test_run_step_fail_run_skopeo(self, skopeo_mock): parent_work_dir_path=parent_work_dir_path, ) + # run step (mock fail) skopeo_mock.copy.side_effect = sh.ErrorReturnCode('skopeo', b'mock stdout', b'mock error') result = step_implementer._run_step() + # verify expected_step_result = StepResult( step_name='push-container-image', sub_step_name='Skopeo', @@ -168,31 +363,48 @@ def test_run_step_fail_run_skopeo(self, skopeo_mock): ) expected_step_result.add_artifact( name='container-image-registry-uri', - value='fake-registry.xyz' + value='fake-registry.xyz', + description='Registry URI poriton of the container image tag' \ + ' of the pushed container image.' ) expected_step_result.add_artifact( name='container-image-registry-organization', - value='fake-org' + value='fake-org', + description='Organization portion of the container image tag' \ + ' of the pushed container image.' ) expected_step_result.add_artifact( name='container-image-repository', - value='fake-app-fake-service' + value='fake-app-fake-service', + description='Repository portion of the container image tag' \ + ' of the pushed container image.' ) expected_step_result.add_artifact( name='container-image-name', - value='fake-app-fake-service' + value='fake-app-fake-service', + description='Another way to reference the' \ + ' repository portion of the container image tag of the pushed container image.' ) expected_step_result.add_artifact( name='container-image-version', - value=image_version + value=image_version, + description='Version portion of the container image tag of the pushed container image.' ) expected_step_result.add_artifact( name='container-image-tag', - value='fake-registry.xyz/fake-org/fake-app-fake-service:1.0-69442c8' + value='fake-registry.xyz/fake-org/fake-app-fake-service:1.0-69442c8', + description='Full container image tag of the pushed container,' \ + ' including the registry URI.' + ) + expected_step_result.add_artifact( + name='container-image-short-tag', + value='fake-org/fake-app-fake-service:1.0-69442c8', + description='Short container image tag of the pushed container image,' \ + ' excluding the registry URI.' ) expected_step_result.success = False - expected_step_result.message = f"Error pushing container image ({image_tar_file}) " +\ - f" to tag ({image_tag}) using skopeo: \n" +\ + expected_step_result.message = f"Error pushing container image ({image_pull_tag}) " +\ + f" to tag ({image_push_tag}) using skopeo: \n" +\ f"\n" +\ f" RAN: skopeo\n" +\ f"\n" +\ @@ -201,19 +413,18 @@ def test_run_step_fail_run_skopeo(self, skopeo_mock): f"\n" +\ f" STDERR:\n" +\ f"mock error" - self.assertEqual(result, expected_step_result) containers_config_auth_file = os.path.join( - Path.home(), - '.container-image-repo-auth.json' + step_implementer.work_dir_path, + 'container-auth.json' ) skopeo_mock.copy.assert_called_once_with( "--src-tls-verify=true", "--dest-tls-verify=true", f"--authfile={containers_config_auth_file}", - f'docker-archive:{image_tar_file}', - f'docker://{image_tag}', + f'containers-storage:{image_pull_tag}', + f'docker://{image_push_tag}', _out=Any(IOBase), _err=Any(IOBase), _tee='err' diff --git a/tests/step_implementers/shared/test_openscap_generic.py b/tests/step_implementers/shared/test_openscap_generic.py index c78b880f..063a4396 100644 --- a/tests/step_implementers/shared/test_openscap_generic.py +++ b/tests/step_implementers/shared/test_openscap_generic.py @@ -1,6 +1,3 @@ -# pylint: disable=missing-module-docstring -# pylint: disable=missing-class-docstring -# pylint: disable=missing-function-docstring import os import re from contextlib import redirect_stdout @@ -17,7 +14,7 @@ from ploigos_step_runner.step_implementers.shared.openscap_generic import OpenSCAPGeneric -class TestStepImplementerSharedOpenSCAPGeneric(BaseStepImplementerTestCase): +class BaseTestStepImplementerSharedOpenSCAPGeneric(BaseStepImplementerTestCase): def create_step_implementer( self, step_config={}, @@ -35,22 +32,37 @@ def create_step_implementer( parent_work_dir_path=parent_work_dir_path ) - def test_step_implementer_config_defaults(self): +class TestStepImplementerSharedOpenSCAPGeneric_step_implementer_config_defaults( + BaseTestStepImplementerSharedOpenSCAPGeneric +): + + def test_result(self): defaults = OpenSCAPGeneric.step_implementer_config_defaults() expected_defaults = { - 'oscap-fetch-remote-resources': True + 'oscap-fetch-remote-resources': True, + 'container-image-pull-repository-type': 'containers-storage:', + 'container-image-repository-type': 'containers-storage:' } self.assertEqual(defaults, expected_defaults) - def test__required_config_or_result_keys(self): +class TestStepImplementerSharedOpenSCAPGeneric__required_config_or_result_keys( + BaseTestStepImplementerSharedOpenSCAPGeneric +): + def test_result(self): required_keys = OpenSCAPGeneric._required_config_or_result_keys() expected_required_keys = [ 'oscap-input-definitions-uri', - 'image-tar-file' + 'container-image-tag', + 'container-image-pull-repository-type', + ['container-image-pull-repository-type', 'container-image-repository-type'] ] self.assertEqual(required_keys, expected_required_keys) - def test__validate_required_config_or_previous_step_result_artifact_keys_valid(self): +@patch("ploigos_step_runner.StepImplementer._validate_required_config_or_previous_step_result_artifact_keys") +class TestStepImplementerSharedOpenSCAPGeneric__validate_required_config_or_previous_step_result_artifact_keys( + BaseTestStepImplementerSharedOpenSCAPGeneric +): + def test__valid(self, mock_super_validate): step_config = { 'oscap-input-definitions-uri': 'https://www.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.bz2', 'image-tar-file': 'does-not-matter' @@ -68,7 +80,9 @@ def test__validate_required_config_or_previous_step_result_artifact_keys_valid(s step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_protocal(self): + mock_super_validate.assert_called_once_with() + + def test_invalid_protocal(self, mock_super_validate): oscap_input_definitions_uri = 'foo://www.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.bz2' step_config = { 'oscap-input-definitions-uri': oscap_input_definitions_uri, @@ -94,7 +108,9 @@ def test__validate_required_config_or_previous_step_result_artifact_keys_invalid ): step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - def test__validate_required_config_or_previous_step_result_artifact_keys_invalid_extension(self): + mock_super_validate.assert_called_once_with() + + def test_invalid_extension(self, mock_super_validate): oscap_input_definitions_uri = 'https://www.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.foo' step_config = { 'oscap-input-definitions-uri': oscap_input_definitions_uri, @@ -120,144 +136,13 @@ def test__validate_required_config_or_previous_step_result_artifact_keys_invalid ): step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - def test__validate_required_config_or_previous_step_result_artifact_keys_missing_required_keys(self): - step_config = {} - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - - step_implementer = self.create_step_implementer( - step_config=step_config, - step_name='test', - implementer='OpenSCAP', - parent_work_dir_path=parent_work_dir_path - ) - - with self.assertRaisesRegex( - AssertionError, - re.compile( - r"Missing required step configuration or previous step result" - r" artifact keys: \['oscap-input-definitions-uri', 'image-tar-file'\]" - ) - ): - step_implementer._validate_required_config_or_previous_step_result_artifact_keys() - - @patch('sh.buildah', create=True) - def test___buildah_import_image_from_tar_success(self, buildah_mock): - image_tar_file = "/does/not/matter.tar" - container_name = "test" - - OpenSCAPGeneric._OpenSCAPGeneric__buildah_import_image_from_tar( - image_tar_file=image_tar_file, - container_name=container_name - ) - - buildah_mock.assert_called_once_with( - 'from', - '--storage-driver', 'vfs', - '--name', container_name, - f"docker-archive:{image_tar_file}", - _out=Any(IOBase), - _err=Any(IOBase), - _tee='err' - ) - - @patch('sh.buildah', create=True) - def test___buildah_import_image_from_tar_error(self, buildah_mock): - image_tar_file = "/does/not/matter.tar" - container_name = "test" - - buildah_mock.side_effect = sh.ErrorReturnCode('buildah', b'mock out', b'mock error') - - with self.assertRaisesRegex( - StepRunnerException, - re.compile( - rf"Error importing the image \({image_tar_file}\):" - r".*RAN: buildah" - r".*STDOUT:" - r".*mock out" - r".*STDERR:" - r".*mock error", - re.DOTALL - ) - ): - OpenSCAPGeneric._OpenSCAPGeneric__buildah_import_image_from_tar( - image_tar_file=image_tar_file, - container_name=container_name - ) - - buildah_mock.assert_called_once_with( - 'from', - '--storage-driver', 'vfs', - '--name', container_name, - f"docker-archive:{image_tar_file}", - _out=Any(IOBase), - _err=Any(IOBase), - _tee='err' - ) - - @patch('sh.buildah', create=True) - def test___buildah_mount_container_success(self, buildah_mock): - buildah_unshare_command = sh.buildah.bake('unshare') - container_name = "test" - - expected_mount_path = '/this/is/a/path' - buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = create_sh_side_effect( - mock_stdout=f"{expected_mount_path}", - ) - - container_mount_path = OpenSCAPGeneric._OpenSCAPGeneric__buildah_mount_container( - buildah_unshare_command=buildah_unshare_command, - container_id=container_name - ) - - self.assertEqual(container_mount_path, expected_mount_path) - - buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with( - '--storage-driver', 'vfs', - container_name, - _out=Any(IOBase), - _err=Any(IOBase), - _tee='err' - ) - - @patch('sh.buildah', create=True) - def test___buildah_mount_container_error(self, buildah_mock): - buildah_unshare_command = sh.buildah.bake('unshare') - container_name = "test" - - buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = sh.ErrorReturnCode( - 'buildah mount', - b'mock out', - b'mock error' - ) - - with self.assertRaisesRegex( - StepRunnerException, - re.compile( - rf"Error mounting container \({container_name}\):" - r".*RAN: buildah" - r".*STDOUT:" - r".*mock out" - r".*STDERR:" - r".*mock error", - re.DOTALL - ) - ): - OpenSCAPGeneric._OpenSCAPGeneric__buildah_mount_container( - buildah_unshare_command=buildah_unshare_command, - container_id=container_name - ) - - buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with( - '--storage-driver', 'vfs', - container_name, - _out=Any(IOBase), - _err=Any(IOBase), - _tee='err' - ) + mock_super_validate.assert_called_once_with() +class TestStepImplementerSharedOpenSCAPGeneric___get_oscap_document_type( + BaseTestStepImplementerSharedOpenSCAPGeneric +): @patch('sh.oscap', create=True) - def test___get_oscap_document_type_sds(self, oscap_mock): + def test_sds(self, oscap_mock): oscap_input_file = '/does/not/matter.xml' sh.oscap.info.side_effect = create_sh_side_effect( @@ -293,7 +178,7 @@ def test___get_oscap_document_type_sds(self, oscap_mock): self.assertEqual(oscap_document_type, 'Source Data Stream') @patch('sh.oscap', create=True) - def test___get_oscap_document_type_oval(self, oscap_mock): + def test_oval(self, oscap_mock): oscap_input_file = '/does/not/matter.xml' sh.oscap.info.side_effect = create_sh_side_effect( @@ -315,7 +200,7 @@ def test___get_oscap_document_type_oval(self, oscap_mock): self.assertEqual(oscap_document_type, 'OVAL Definitions') @patch('sh.oscap', create=True) - def test___get_oscap_document_type_xccdf(self, oscap_mock): + def test_xccdf(self, oscap_mock): oscap_input_file = '/does/not/matter.xml' sh.oscap.info.side_effect = create_sh_side_effect( @@ -356,7 +241,7 @@ def test___get_oscap_document_type_xccdf(self, oscap_mock): self.assertEqual(oscap_document_type, 'XCCDF Checklist') @patch('sh.oscap', create=True) - def test___get_oscap_document_type_error(self, oscap_mock): + def test_error(self, oscap_mock): oscap_input_file = '/does/not/matter.xml' sh.oscap.info.side_effect = sh.ErrorReturnCode( @@ -387,7 +272,10 @@ def test___get_oscap_document_type_error(self, oscap_mock): _out=Any(IOBase) ) - def test___get_oscap_eval_type_based_on_document_type_sds(self): +class TestStepImplementerSharedOpenSCAPGeneric___get_oscap_eval_type_based_on_document_type( + BaseTestStepImplementerSharedOpenSCAPGeneric +): + def test_sds(self): oscap_document_type = 'Source Data Stream' oscap_eval_type = \ @@ -397,7 +285,7 @@ def test___get_oscap_eval_type_based_on_document_type_sds(self): self.assertEqual(oscap_eval_type, 'xccdf') - def test___get_oscap_eval_type_based_on_document_type_xccdf(self): + def test_xccdf(self): oscap_document_type = 'XCCDF Checklist' oscap_eval_type = \ @@ -407,7 +295,7 @@ def test___get_oscap_eval_type_based_on_document_type_xccdf(self): self.assertEqual(oscap_eval_type, 'xccdf') - def test___get_oscap_eval_type_based_on_document_type_oval(self): + def test_oval(self): oscap_document_type = 'OVAL Definitions' oscap_eval_type = \ @@ -417,7 +305,7 @@ def test___get_oscap_eval_type_based_on_document_type_oval(self): self.assertEqual(oscap_eval_type, 'oval') - def test___get_oscap_eval_type_based_on_document_type_unknown(self): + def test_unknown(self): oscap_document_type = 'What is this nonsense?' oscap_eval_type = \ @@ -427,7 +315,10 @@ def test___get_oscap_eval_type_based_on_document_type_unknown(self): self.assertEqual(oscap_eval_type, None) - def __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( +class TestStepImplementerSharedOpenSCAPGeneric___run_oscap_scan( + BaseTestStepImplementerSharedOpenSCAPGeneric +): + def __run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( self, buildah_mock, oscap_eval_type, @@ -528,10 +419,8 @@ def __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( self.assertEqual(stdout, oscap_stdout_expected) @patch('sh.buildah', create=True) - def test___run_oscap_scan_xccdf_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_xccdf_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='xccdf', oscap_profile='this.is.real.i.sware', @@ -560,10 +449,8 @@ def test___run_oscap_scan_xccdf_do_no_fetch_remote_with_profile_all_pass(self, b ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_all_pass(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_xccdf_do_yes_fetch_remote_with_profile_all_pass(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='xccdf', oscap_profile='this.is.real.i.sware', @@ -592,10 +479,8 @@ def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_all_pass(self, ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_xccdf_do_yes_fetch_remote_no_profile_all_pass(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_xccdf_do_yes_fetch_remote_no_profile_all_pass(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='xccdf', oscap_profile=None, @@ -624,10 +509,8 @@ def test___run_oscap_scan_xccdf_do_yes_fetch_remote_no_profile_all_pass(self, bu ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_with_fail(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_xccdf_do_yes_fetch_remote_with_profile_with_fail(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='xccdf', oscap_profile='this.is.real.i.sware', @@ -674,10 +557,8 @@ def test___run_oscap_scan_xccdf_do_yes_fetch_remote_with_profile_with_fail(self, ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_str_oscap_fetch_remote_resources_flag(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_str_oscap_fetch_remote_resources_flag(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='xccdf', oscap_profile='this.is.real.i.sware', @@ -706,10 +587,8 @@ def test___run_oscap_scan_str_oscap_fetch_remote_resources_flag(self, buildah_mo ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='oval', oscap_fetch_remote_resources=False, @@ -727,10 +606,8 @@ def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, bu ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_oval_do_no_fetch_remote_with_profile_all_pass(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='oval', oscap_fetch_remote_resources=False, @@ -752,7 +629,7 @@ def test___run_oscap_scan_oval_do_no_fetch_remote_with_profile_all_pass(self, bu ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_xccdf_exit_code_1(self, buildah_mock): + def test_xccdf_exit_code_1(self, buildah_mock): with self.assertRaisesRegex( StepRunnerException, re.compile( @@ -776,9 +653,7 @@ def test___run_oscap_scan_xccdf_exit_code_1(self, buildah_mock): re.DOTALL ) ): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='xccdf', oscap_profile='this.is.real.i.sware', @@ -819,7 +694,7 @@ def test___run_oscap_scan_xccdf_exit_code_1(self, buildah_mock): ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_oval_exit_code_1(self, buildah_mock): + def test_oval_exit_code_1(self, buildah_mock): with self.assertRaisesRegex( StepRunnerException, re.compile( @@ -835,9 +710,7 @@ def test___run_oscap_scan_oval_exit_code_1(self, buildah_mock): re.DOTALL ) ): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='oval', oscap_fetch_remote_resources=False, @@ -857,7 +730,7 @@ def test___run_oscap_scan_oval_exit_code_1(self, buildah_mock): ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_oval_exit_code_2(self, buildah_mock): + def test_oval_exit_code_2(self, buildah_mock): with self.assertRaisesRegex( StepRunnerException, re.compile( @@ -873,9 +746,7 @@ def test___run_oscap_scan_oval_exit_code_2(self, buildah_mock): re.DOTALL ) ): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='oval', oscap_fetch_remote_resources=False, @@ -895,7 +766,7 @@ def test___run_oscap_scan_oval_exit_code_2(self, buildah_mock): ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_oval_exit_code_unknown(self, buildah_mock): + def test_oval_exit_code_unknown(self, buildah_mock): with self.assertRaisesRegex( StepRunnerException, re.compile( @@ -911,9 +782,7 @@ def test___run_oscap_scan_oval_exit_code_unknown(self, buildah_mock): re.DOTALL ) ): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='oval', oscap_fetch_remote_resources=False, @@ -933,10 +802,8 @@ def test___run_oscap_scan_oval_exit_code_unknown(self, buildah_mock): ) @patch('sh.buildah', create=True) - def test___run_oscap_scan_xccdf_with_tailoring_file(self, buildah_mock): - TestStepImplementerSharedOpenSCAPGeneric. \ - __run_test___run_oscap_scan_xccdf_do_not_fetch_remote_with_profile_all_pass( - self, + def test_xccdf_with_tailoring_file(self, buildah_mock): + self.__run_test_xccdf_do_not_fetch_remote_with_profile_all_pass( buildah_mock=buildah_mock, oscap_eval_type='xccdf', oscap_profile='this.is.real.i.sware', @@ -965,57 +832,62 @@ def test___run_oscap_scan_xccdf_with_tailoring_file(self, buildah_mock): """ ) +class TestStepImplementerSharedOpenSCAPGeneric__run_step( + BaseTestStepImplementerSharedOpenSCAPGeneric +): @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan') - @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container') + @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container') + @patch( + 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image', + return_value='mock-image-working-container-mock-1' + ) @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type') @patch('sh.buildah', create=True) - def test_run_step_pass( + def test_pass( self, buildah_mock, get_oscap_document_type_mock, - buildah_mount_container_mock, + create_container_from_image_mock, + mount_container_mock, run_oscap_scan_mock ): oscap_document_type = 'Source Data Stream' oscap_eval_type = 'xccdf' oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2' + container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock' step_config = { 'oscap-input-definitions-uri': oscap_input_definitions_uri, - 'oscap-profile': 'foo' + 'oscap-profile': 'foo', + 'container-image-tag': container_image_tag } - image_tar_file_name = 'my_awesome_app' - image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar' oscap_eval_success = True oscap_eval_fails = None with TempDirectory() as temp_dir: + # setup test parent_work_dir_path = os.path.join(temp_dir.path, 'working') mount_path = '/does/not/matter/container-mount' - artifact_config = { - 'image-tar-file': {'description': '', 'value': image_tar_file} - } - workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) - step_implementer = self.create_step_implementer( step_config=step_config, step_name='test', implementer='OpenSCAP', - workflow_result=workflow_result, parent_work_dir_path=parent_work_dir_path ) get_oscap_document_type_mock.return_value = oscap_document_type - buildah_mount_container_mock.return_value = mount_path + mount_container_mock.return_value = mount_path run_oscap_scan_mock.return_value = [ oscap_eval_success, oscap_eval_fails ] + # run test stdout_buff = StringIO() with redirect_stdout(stdout_buff): step_result = step_implementer._run_step() + # verify results expected_results = StepResult( step_name='test', sub_step_name='OpenSCAP', @@ -1037,14 +909,14 @@ def test_run_step_pass( self.assertEqual(expected_results, step_result) stdout = stdout_buff.getvalue() - + expected_container_name = 'mock-image-working-container-mock-1' self.assertRegex( stdout, re.compile( - rf".*Import image: {image_tar_file}" - rf".*Imported image: {image_tar_file}" - rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*" - rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'" + rf".*Create container from image \({container_image_tag}\)" + rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)" + rf".*Mount container: {expected_container_name}" + rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'" rf".*Download input definitions: {oscap_input_definitions_uri}" rf".*Downloaded input definitions to: /.+/working/test/rhel\-8.ds.xml" rf".*Determine OpenSCAP document type of input file: /.+/working/test/rhel\-8\.ds\.xml" @@ -1058,25 +930,30 @@ def test_run_step_pass( ) @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan') - @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container') + @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container') + @patch( + 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image', + return_value='mock-image-working-container-mock-1' + ) @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type') @patch('sh.buildah', create=True) - def test_run_step_fail( + def test_fail( self, buildah_mock, get_oscap_document_type_mock, - buildah_mount_container_mock, + create_container_from_image_mock, + mount_container_mock, run_oscap_scan_mock ): oscap_document_type = 'Source Data Stream' oscap_eval_type = 'xccdf' oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2' + container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock' step_config = { 'oscap-input-definitions-uri': oscap_input_definitions_uri, - 'oscap-profile': 'foo' + 'oscap-profile': 'foo', + 'container-image-tag': container_image_tag } - image_tar_file_name = 'my_awesome_app' - image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar' oscap_eval_success = False oscap_eval_fails = """ Title Install dnf-automatic Package @@ -1089,25 +966,20 @@ def test_run_step_fail( parent_work_dir_path = os.path.join(temp_dir.path, 'working') mount_path = '/does/not/matter/container-mount' - artifact_config = { - 'image-tar-file': {'description': '', 'value': image_tar_file} - } - workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) - step_implementer = self.create_step_implementer( step_config=step_config, step_name='test', implementer='OpenSCAP', - workflow_result=workflow_result, parent_work_dir_path=parent_work_dir_path ) get_oscap_document_type_mock.return_value = oscap_document_type - buildah_mount_container_mock.return_value = mount_path + mount_container_mock.return_value = mount_path run_oscap_scan_mock.return_value = [ oscap_eval_success, oscap_eval_fails ] + stdout_buff = StringIO() with redirect_stdout(stdout_buff): step_result = step_implementer._run_step() @@ -1137,14 +1009,14 @@ def test_run_step_fail( self.assertEqual(expected_results, step_result) stdout = stdout_buff.getvalue() - + expected_container_name = 'mock-image-working-container-mock-1' self.assertRegex( stdout, re.compile( - rf".*Import image: {image_tar_file}" - rf".*Imported image: {image_tar_file}" - rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*" - rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'" + rf".*Create container from image \({container_image_tag}\)" + rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)" + rf".*Mount container: {expected_container_name}" + rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'" rf".*Download input definitions: {oscap_input_definitions_uri}" rf".*Downloaded input definitions to: /.+/working/test/rhel\-8.ds.xml" rf".*Determine OpenSCAP document type of input file: /.+/working/test/rhel\-8\.ds\.xml" @@ -1158,27 +1030,32 @@ def test_run_step_fail( ) @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan') - @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container') + @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container') + @patch( + 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image', + return_value='mock-image-working-container-mock-1' + ) @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type') @patch('sh.buildah', create=True) - def test_run_step_pass_with_tailoring_file( + def test_pass_with_tailoring_file( self, buildah_mock, get_oscap_document_type_mock, - buildah_mount_container_mock, + create_container_from_image_mock, + mount_container_mock, run_oscap_scan_mock ): oscap_document_type = 'Source Data Stream' oscap_eval_type = 'xccdf' oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2' + container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock' oscap_tailoring_uri = 'https://raw.githubusercontent.com/ploigos/ploigos-example-oscap-content/main/xccdf_com.redhat.ploigos_profile_example_ubi8-tailoring-xccdf.xml' step_config = { 'oscap-input-definitions-uri': oscap_input_definitions_uri, 'oscap-tailoring-uri': oscap_tailoring_uri, - 'oscap-profile': 'foo' + 'oscap-profile': 'foo', + 'container-image-tag': container_image_tag } - image_tar_file_name = 'my_awesome_app' - image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar' oscap_eval_success = True oscap_eval_fails = None @@ -1186,21 +1063,15 @@ def test_run_step_pass_with_tailoring_file( parent_work_dir_path = os.path.join(temp_dir.path, 'working') mount_path = '/does/not/matter/container-mount' - artifact_config = { - 'image-tar-file': {'description': '', 'value': image_tar_file} - } - workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) - step_implementer = self.create_step_implementer( step_config=step_config, step_name='test', implementer='OpenSCAP', - workflow_result=workflow_result, parent_work_dir_path=parent_work_dir_path ) get_oscap_document_type_mock.return_value = oscap_document_type - buildah_mount_container_mock.return_value = mount_path + mount_container_mock.return_value = mount_path run_oscap_scan_mock.return_value = [ oscap_eval_success, oscap_eval_fails @@ -1208,7 +1079,7 @@ def test_run_step_pass_with_tailoring_file( stdout_buff = StringIO() with redirect_stdout(stdout_buff): - result = step_implementer._run_step() + step_result = step_implementer._run_step() expected_results = StepResult( step_name='test', @@ -1228,16 +1099,18 @@ def test_run_step_pass_with_tailoring_file( name='stdout-report', value=f"{step_implementer.work_dir_path}/oscap-xccdf-out" ) - self.assertEqual(expected_results, result) + self.assertEqual(expected_results, step_result) stdout = stdout_buff.getvalue() + + expected_container_name = 'mock-image-working-container-mock-1' self.assertRegex( stdout, re.compile( - rf".*Import image: {image_tar_file}" - rf".*Imported image: {image_tar_file}" - rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*" - rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'" + rf".*Create container from image \({container_image_tag}\)" + rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)" + rf".*Mount container: {expected_container_name}" + rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'" rf".*Download input definitions: {oscap_input_definitions_uri}" rf".*Downloaded input definitions to: /.+/working/test/rhel\-8.ds.xml" rf".*Download oscap tailoring file: {oscap_tailoring_uri}" @@ -1254,25 +1127,31 @@ def test_run_step_pass_with_tailoring_file( @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.download_and_decompress_source_to_destination') @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan') - @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container') + @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container') + @patch( + 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image', + return_value='mock-image-working-container-mock-1' + ) @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type') @patch('sh.buildah', create=True) - def test_run_step_fail_downloading_open_scap_input_file( + def test_fail_downloading_open_scap_input_file( self, buildah_mock, get_oscap_document_type_mock, - buildah_mount_container_mock, + create_container_from_image_mock, + mount_container_mock, run_oscap_scan_mock, download_and_decompress_source_to_destination_mock ): oscap_document_type = 'Source Data Stream' + oscap_eval_type = 'xccdf' oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2' + container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock' step_config = { 'oscap-input-definitions-uri': oscap_input_definitions_uri, - 'oscap-profile': 'foo' + 'oscap-profile': 'foo', + 'container-image-tag': container_image_tag } - image_tar_file_name = 'my_awesome_app' - image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar' oscap_eval_success = True oscap_eval_fails = None @@ -1280,21 +1159,15 @@ def test_run_step_fail_downloading_open_scap_input_file( parent_work_dir_path = os.path.join(temp_dir.path, 'working') mount_path = '/does/not/matter/container-mount' - artifact_config = { - 'image-tar-file': {'description': '', 'value': image_tar_file} - } - workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) - step_implementer = self.create_step_implementer( step_config=step_config, step_name='test', implementer='OpenSCAP', - workflow_result=workflow_result, parent_work_dir_path=parent_work_dir_path ) get_oscap_document_type_mock.return_value = oscap_document_type - buildah_mount_container_mock.return_value = mount_path + mount_container_mock.return_value = mount_path run_oscap_scan_mock.return_value = [ oscap_eval_success, oscap_eval_fails @@ -1305,10 +1178,12 @@ def test_run_step_fail_downloading_open_scap_input_file( mock_error_msg ) + # run test stdout_buff = StringIO() with redirect_stdout(stdout_buff): step_result = step_implementer._run_step() + # verify results expected_step_result = StepResult( step_name='test', sub_step_name='OpenSCAP', @@ -1317,18 +1192,17 @@ def test_run_step_fail_downloading_open_scap_input_file( expected_step_result.success = False expected_step_result.message = 'Error downloading OpenSCAP input file: ' \ f'{mock_error_msg}' - - self.assertEqual(step_result.get_step_result_dict(), expected_step_result.get_step_result_dict()) + self.assertEqual(expected_step_result, step_result) stdout = stdout_buff.getvalue() - + expected_container_name = 'mock-image-working-container-mock-1' self.assertRegex( stdout, re.compile( - rf".*Import image: {image_tar_file}" - rf".*Imported image: {image_tar_file}" - rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*" - rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'" + rf".*Create container from image \({container_image_tag}\)" + rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)" + rf".*Mount container: {expected_container_name}" + rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'" rf".*Download input definitions: {oscap_input_definitions_uri}", re.DOTALL ) @@ -1336,54 +1210,56 @@ def test_run_step_fail_downloading_open_scap_input_file( @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.download_and_decompress_source_to_destination') @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__run_oscap_scan') - @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__buildah_mount_container') + @patch('ploigos_step_runner.step_implementers.shared.openscap_generic.mount_container') + @patch( + 'ploigos_step_runner.step_implementers.shared.openscap_generic.create_container_from_image', + return_value='mock-image-working-container-mock-1' + ) @patch.object(OpenSCAPGeneric, '_OpenSCAPGeneric__get_oscap_document_type') @patch('sh.buildah', create=True) - def test_run_step_fail_downloading_open_scap_tailoring_file( + def test_fail_downloading_open_scap_tailoring_file( self, buildah_mock, get_oscap_document_type_mock, - buildah_mount_container_mock, + create_container_from_image_mock, + mount_container_mock, run_oscap_scan_mock, download_and_decompress_source_to_destination_mock ): oscap_document_type = 'Source Data Stream' + oscap_eval_type = 'xccdf' oscap_input_definitions_uri = 'https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2' + container_image_tag = 'localhost/mock-org/mock-image:v0.42.0-mock' oscap_tailoring_uri = 'https://raw.githubusercontent.com/ploigos/ploigos-example-oscap-content/main/xccdf_com.redhat.ploigos_profile_example_ubi8-tailoring-xccdf.xml' step_config = { 'oscap-input-definitions-uri': oscap_input_definitions_uri, + 'oscap-profile': 'foo', 'oscap-tailoring-uri': oscap_tailoring_uri, - 'oscap-profile': 'foo' + 'container-image-tag': container_image_tag } - image_tar_file_name = 'my_awesome_app' - image_tar_file = f'/does/not/matter/{image_tar_file_name}.tar' oscap_eval_success = True oscap_eval_fails = None with TempDirectory() as temp_dir: + # setup test parent_work_dir_path = os.path.join(temp_dir.path, 'working') mount_path = '/does/not/matter/container-mount' - artifact_config = { - 'image-tar-file': {'description': '', 'value': image_tar_file} - } - workflow_result = self.setup_previous_result(parent_work_dir_path, artifact_config) - step_implementer = self.create_step_implementer( step_config=step_config, step_name='test', implementer='OpenSCAP', - workflow_result=workflow_result, parent_work_dir_path=parent_work_dir_path ) get_oscap_document_type_mock.return_value = oscap_document_type - buildah_mount_container_mock.return_value = mount_path + mount_container_mock.return_value = mount_path run_oscap_scan_mock.return_value = [ oscap_eval_success, oscap_eval_fails ] + # run test with mock error mock_error_msg = 'mock error downloading open scap file' download_and_decompress_source_to_destination_mock.side_effect = [ "foo", @@ -1394,6 +1270,7 @@ def test_run_step_fail_downloading_open_scap_tailoring_file( with redirect_stdout(stdout_buff): step_result = step_implementer._run_step() + # verify results expected_step_result = StepResult( step_name='test', sub_step_name='OpenSCAP', @@ -1402,19 +1279,20 @@ def test_run_step_fail_downloading_open_scap_tailoring_file( expected_step_result.success = False expected_step_result.message = 'Error downloading OpenSCAP tailoring file: ' \ f'{mock_error_msg}' - - self.assertEqual(step_result.get_step_result_dict(), expected_step_result.get_step_result_dict()) + self.assertEqual(expected_step_result, step_result) stdout = stdout_buff.getvalue() - + expected_container_name = 'mock-image-working-container-mock-1' self.assertRegex( stdout, re.compile( - rf".*Import image: {image_tar_file}" - rf".*Imported image: {image_tar_file}" - rf".*Mount container: {image_tar_file_name}\-test\-OpenSCAP.*" - rf".*Mounted container \({image_tar_file_name}\-test\-OpenSCAP.*\) with mount path: '{mount_path}'" - rf".*Download input definitions: {oscap_input_definitions_uri}", + rf".*Create container from image \({container_image_tag}\)" + rf".*Created container \({expected_container_name}\) from image \({container_image_tag}\)" + rf".*Mount container: {expected_container_name}" + rf".*Mounted container \({expected_container_name}\) with mount path: '{mount_path}'" + rf".*Download input definitions: {oscap_input_definitions_uri}" + rf".*Downloaded input definitions to: foo" + rf".*Download oscap tailoring file: {oscap_tailoring_uri}", re.DOTALL ) ) diff --git a/tests/step_implementers/sign_container_image/test_podman_sign.py b/tests/step_implementers/sign_container_image/test_podman_sign.py index f5b275db..7cb10025 100644 --- a/tests/step_implementers/sign_container_image/test_podman_sign.py +++ b/tests/step_implementers/sign_container_image/test_podman_sign.py @@ -97,7 +97,7 @@ def test__required_config_or_result_keys(self): required_keys = PodmanSign._required_config_or_result_keys() expected_required_keys = [ ['signer-pgp-private-key', 'container-image-signer-pgp-private-key'], - 'container-image-tag' + ['container-image-push-tag', 'container-image-tag'] ] self.assertEqual(required_keys, expected_required_keys) @@ -162,12 +162,15 @@ def sign_image_side_effect( containers_config_tls_verify=True, container_command_short_name='podman' ) - expected_step_result = StepResult( step_name='sign-container-image', sub_step_name='PodmanSign', sub_step_implementer_name='PodmanSign' ) + expected_step_result.add_artifact( + name='container-image-signed-tag', + value=container_image_tag, + ) expected_step_result.add_artifact( name='container-image-signature-file-path', value= os.path.join( @@ -256,6 +259,10 @@ def sign_image_side_effect( sub_step_name='PodmanSign', sub_step_implementer_name='PodmanSign' ) + expected_step_result.add_artifact( + name='container-image-signed-tag', + value=container_image_tag, + ) expected_step_result.add_artifact( name='container-image-signature-file-path', value= os.path.join( @@ -359,6 +366,14 @@ def sign_image_side_effect( sub_step_name='PodmanSign', sub_step_implementer_name='PodmanSign' ) + expected_step_result.add_artifact( + name='container-image-signed-tag', + value=container_image_tag, + ) + expected_step_result.add_artifact( + name='container-image-signed-tag', + value=container_image_tag, + ) expected_step_result.add_artifact( name='container-image-signature-file-path', value= os.path.join( @@ -463,6 +478,10 @@ def sign_image_side_effect( sub_step_name='PodmanSign', sub_step_implementer_name='PodmanSign' ) + expected_step_result.add_artifact( + name='container-image-signed-tag', + value=container_image_tag, + ) expected_step_result.add_artifact( name='container-image-signature-file-path', value= os.path.join( @@ -487,9 +506,8 @@ def sign_image_side_effect( expected_step_result.success = False expected_step_result.message = 'mock upload error' - print(expected_step_result) - print(result) self.assertEqual(expected_step_result, result) + @patch('ploigos_step_runner.step_implementers.sign_container_image.podman_sign.import_pgp_key') @patch.object(PodmanSign, '_PodmanSign__sign_image') def test_run_step_fail_import_pgp_key(self, sign_image_mock, import_pgp_key_mock): @@ -538,8 +556,6 @@ def sign_image_side_effect( expected_step_result.success = False expected_step_result.message = 'mock error importing pgp key' - print(expected_step_result) - print(result) self.assertEqual(expected_step_result, result) @patch('ploigos_step_runner.step_implementers.sign_container_image.podman_sign.import_pgp_key') diff --git a/tests/utils/test_containers.py b/tests/utils/test_containers.py index 0d7d211c..c7a6d2cd 100644 --- a/tests/utils/test_containers.py +++ b/tests/utils/test_containers.py @@ -4,8 +4,7 @@ import sh from ploigos_step_runner.config import ConfigValue -from ploigos_step_runner.utils.containers import (container_registries_login, - container_registry_login) +from ploigos_step_runner.utils.containers import * from tests.helpers.base_test_case import BaseTestCase from tests.helpers.test_utils import * @@ -992,3 +991,145 @@ def test_list_of_dicts_falsey(self, container_registry_login_mock): ) ] container_registry_login_mock.assert_has_calls(calls) + +class Test_create_container_from_image(BaseTestCase): + @patch('sh.buildah', create=True) + def test_success_default_repository_type(self, buildah_mock): + # setup test + image_tag = "localhost/mock-org/mock-image:v0.42.0-mock" + + # run test + buildah_mock.side_effect = create_sh_side_effect( + mock_stdout='mock-image-working-container-mock' + ) + actual_container_name = create_container_from_image( + image_tag=image_tag + ) + + # verify + self.assertEqual(actual_container_name, 'mock-image-working-container-mock') + + buildah_mock.assert_called_once_with( + 'from', + f"container-storage:{image_tag}", + _out=Any(IOBase), + _err=Any(IOBase), + _tee='err' + ) + + @patch('sh.buildah', create=True) + def test_success_remote_repository_type(self, buildah_mock): + # setup test + image_tag = "quay.io/mock-org/mock-image:v0.42.0-mock" + + # run test + buildah_mock.side_effect = create_sh_side_effect( + mock_stdout='mock-image-working-container-mock' + ) + actual_container_name = create_container_from_image( + image_tag=image_tag, + repository_type='docker://' + ) + + # verify + self.assertEqual(actual_container_name, 'mock-image-working-container-mock') + + buildah_mock.assert_called_once_with( + 'from', + f"docker://{image_tag}", + _out=Any(IOBase), + _err=Any(IOBase), + _tee='err' + ) + + @patch('sh.buildah', create=True) + def test_error(self, buildah_mock): + # setup test + image_tag = "localhost/mock-org/mock-image:v0.42.0-mock" + + # run test with mock error + with self.assertRaisesRegex( + RuntimeError, + re.compile( + rf"Error creating container from image \({image_tag}\):" + r".*RAN: buildah" + r".*STDOUT:" + r".*mock out" + r".*STDERR:" + r".*mock error", + re.DOTALL + ) + ): + buildah_mock.side_effect = sh.ErrorReturnCode('buildah', b'mock out', b'mock error') + create_container_from_image( + image_tag=image_tag + ) + + # verify + buildah_mock.assert_called_once_with( + 'from', + f"container-storage:{image_tag}", + _out=Any(IOBase), + _err=Any(IOBase), + _tee='err' + ) + +class Test_mount_container(BaseTestCase): + @patch('sh.buildah', create=True) + def test_success(self, buildah_mock): + buildah_unshare_command = sh.buildah.bake('unshare') + container_name = "test" + + expected_mount_path = '/this/is/a/path' + buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = create_sh_side_effect( + mock_stdout=f"{expected_mount_path}", + ) + + container_mount_path = mount_container( + buildah_unshare_command=buildah_unshare_command, + container_id=container_name + ) + + self.assertEqual(container_mount_path, expected_mount_path) + + buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with( + container_name, + _out=Any(IOBase), + _err=Any(IOBase), + _tee='err' + ) + + @patch('sh.buildah', create=True) + def test_buildah_error(self, buildah_mock): + buildah_unshare_command = sh.buildah.bake('unshare') + container_name = "test" + + buildah_mock.bake('unshare').bake('buildah', 'mount').side_effect = sh.ErrorReturnCode( + 'buildah mount', + b'mock out', + b'mock error' + ) + + with self.assertRaisesRegex( + RuntimeError, + re.compile( + rf"Error mounting container \({container_name}\):" + r".*RAN: buildah" + r".*STDOUT:" + r".*mock out" + r".*STDERR:" + r".*mock error", + re.DOTALL + ) + ): + mount_container( + buildah_unshare_command=buildah_unshare_command, + container_id=container_name + ) + + buildah_mock.bake('unshare').bake('buildah', 'mount').assert_called_once_with( + container_name, + _out=Any(IOBase), + _err=Any(IOBase), + _tee='err' + ) \ No newline at end of file