From 9b6b572d6bc4b3757a926783a3ebe111d8f5a724 Mon Sep 17 00:00:00 2001 From: John Thompkins Date: Fri, 25 Jun 2021 11:44:49 -0600 Subject: [PATCH] rekor refactor --- .../automated_governance/__init__.py | 3 - .../automated_governance/rekor.py | 200 ----------- .../generate_evidence/rekor_sign_evidence.py | 27 ++ .../step_implementers/report/__init__.py | 2 + .../report/rekor_sign_report.py | 27 ++ .../step_implementers/shared/__init__.py | 1 + .../shared/rekor_sign_generic.py | 285 +++++++++++++++ src/ploigos_step_runner/utils/file.py | 51 ++- src/ploigos_step_runner/utils/pgp.py | 42 ++- .../automated_governance/test_rekor.py | 217 ------------ .../test_generate_evidence.py | 32 +- .../generate_evidence/test_rekor_evidence.py | 41 +++ .../report/test_rekor_report.py | 41 +++ .../shared/test_rekor_sign_generic.py | 327 ++++++++++++++++++ tests/utils/test_file.py | 123 +++++-- tests/utils/test_pgp.py | 94 ++++- 16 files changed, 1044 insertions(+), 469 deletions(-) delete mode 100644 src/ploigos_step_runner/step_implementers/automated_governance/__init__.py delete mode 100644 src/ploigos_step_runner/step_implementers/automated_governance/rekor.py create mode 100644 src/ploigos_step_runner/step_implementers/generate_evidence/rekor_sign_evidence.py create mode 100644 src/ploigos_step_runner/step_implementers/report/rekor_sign_report.py create mode 100644 src/ploigos_step_runner/step_implementers/shared/rekor_sign_generic.py delete mode 100644 tests/step_implementers/automated_governance/test_rekor.py create mode 100644 tests/step_implementers/generate_evidence/test_rekor_evidence.py create mode 100644 tests/step_implementers/report/test_rekor_report.py create mode 100644 tests/step_implementers/shared/test_rekor_sign_generic.py diff --git a/src/ploigos_step_runner/step_implementers/automated_governance/__init__.py b/src/ploigos_step_runner/step_implementers/automated_governance/__init__.py deleted file mode 100644 index 358aa10e..00000000 --- a/src/ploigos_step_runner/step_implementers/automated_governance/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""`StepImplementers` for the `rekor` step. -""" -from ploigos_step_runner.step_implementers.automated_governance.rekor import Rekor diff --git a/src/ploigos_step_runner/step_implementers/automated_governance/rekor.py b/src/ploigos_step_runner/step_implementers/automated_governance/rekor.py deleted file mode 100644 index 7df3c10e..00000000 --- a/src/ploigos_step_runner/step_implementers/automated_governance/rekor.py +++ /dev/null @@ -1,200 +0,0 @@ -"""`StepImplementer` for the `automated-governance` step using Rekor. - -Step Configuration ------------------- -Step configuration expected as input to this step. -Could come from: - - * static configuration - * runtime configuration - * previous step results - -Configuration Key | Required? | Default | Description --------------------|-----------|---------|----------- -`rekor-server-url` | Yes | | URL for Rekor server to upload artifact(s) to. -`signer-pgp-public-key-path` \ - | Yes | | DEPRECIATED. Will be removed in next release in favor \ - of automatically getting public key from private key. \ - Path to PGP public key corresponding to private key \ - that will be used to sign the Rekor artifact. -`signer-pgp-private-key-user` \ - | Yes | | DEPRECIATED. Will be removed in next release in favor \ - of `signer-pgp-private-key` which will import a given \ - PGP private key and automatically determine the \ - PGP private key fingerprint. - -Result Artifacts ----------------- -Results artifacts output by this step. - -Result Artifact Key | Description ---------------------|------------ -`rekor-uuid` | UUID of Rekor entry created by artifact upload. -`rekor-entry` | DEPRECIATED. Will likely be removed in next release. -""" - -import json -import os -import sys -from io import StringIO -from pathlib import Path - -import sh -from ploigos_step_runner import StepImplementer, StepResult -from ploigos_step_runner.utils.file import base64_encode, get_file_hash -from ploigos_step_runner.utils.io import \ - create_sh_redirect_to_multiple_streams_fn_callback -from ploigos_step_runner.utils.pgp import detach_sign_with_pgp_key - - -DEFAULT_CONFIG = {} - -REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [ - 'rekor-server-url', - 'signer-pgp-public-key-path', - 'signer-pgp-private-key-user' -] -class Rekor(StepImplementer): # pylint: disable=too-few-public-methods - """`StepImplementer` for the `automated-governance` step using Rekor. - """ - - @staticmethod - def step_implementer_config_defaults(): - """Getter for the StepImplementer's configuration defaults. - - Returns - ------- - dict - Default values to use for step configuration values. - - Notes - ----- - These are the lowest precedence configuration values. - - """ - return DEFAULT_CONFIG - - @staticmethod - def _required_config_or_result_keys(): - """Getter for step configuration or previous step result artifacts that are required before - running this step. - - See Also - -------- - _validate_required_config_or_previous_step_result_artifact_keys - - Returns - ------- - array_list - Array of configuration keys or previous step result artifacts - that are required before running the step. - """ - return REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS - - @staticmethod - def create_rekor_entry( - signer_pgp_public_key_path, - signer_pgp_private_key_user, - extra_data_file - ): - """TODO: function will be refactored in next release. will doc then. - """ - file_hash = get_file_hash(extra_data_file) - sig_file = extra_data_file + '.asc' - sig_file_path = Path(sig_file) - if sig_file_path.exists(): - sig_file_path.unlink() - detach_sign_with_pgp_key( - output_signature_path=sig_file, - file_to_sign_path=extra_data_file, - pgp_private_key_fingerprint=signer_pgp_private_key_user - ) - base64_encoded_extra_data = base64_encode(extra_data_file) - rekor_entry = { - "kind": "rekord", - "apiVersion": "0.0.1", - "spec": { - "signature": { - "format": "pgp", - "content": base64_encode(sig_file), - "publicKey": { - "content": base64_encode(signer_pgp_public_key_path) - } - }, - "data": { - "content": base64_encoded_extra_data, - "hash": { - "algorithm": "sha256", - "value": file_hash - } - }, - "extraData": base64_encoded_extra_data - } - } - return rekor_entry - - def upload_to_rekor( - self, - rekor_server, - extra_data_file, - signer_pgp_public_key_path, - signer_pgp_private_key_user - ): - """TODO: function will be refactored in next release. will doc then. - """ - rekor_entry = Rekor.create_rekor_entry( - signer_pgp_public_key_path=signer_pgp_public_key_path, - signer_pgp_private_key_user=signer_pgp_private_key_user, - extra_data_file=extra_data_file - ) - - rekor_entry_path = self.write_working_file( - filename='entry.json', - contents=bytes(json.dumps(rekor_entry), 'utf-8') - ) - - rekor_upload_stdout_result = StringIO() - rekor_upload_stdout_callback = create_sh_redirect_to_multiple_streams_fn_callback([ - sys.stdout, - rekor_upload_stdout_result - ]) - rekor = sh.rekor( # pylint: disable=no-member - 'upload', - '--rekor_server', - rekor_server, - '--entry', - rekor_entry_path, - _out=rekor_upload_stdout_callback, - _err_to_out=True, - _tee='out' - ) - rekor_uuid = str(rekor).split('/')[-1].strip(' \n') - return rekor_entry, rekor_uuid - - def _run_step(self): - """Runs the step implemented by this StepImplementer. - - Returns - ------- - StepResult - Object containing the dictionary results of this step. - """ - step_result = StepResult.from_step_implementer(self) - rekor_server = self.get_value('rekor-server-url') - extra_data_file = os.path.join(self.work_dir_path, self.step_name+'.json') - self.workflow_result.write_results_to_json_file(extra_data_file) - rekor_entry, rekor_uuid = self.upload_to_rekor( - rekor_server=rekor_server, - extra_data_file=extra_data_file, - signer_pgp_public_key_path=self.get_value('signer-pgp-public-key-path'), - signer_pgp_private_key_user=self.get_value('signer-pgp-private-key-user') - ) - step_result.add_artifact( - name='rekor-entry', - value=rekor_entry - ) - step_result.add_artifact( - name='rekor-uuid', - value=rekor_uuid - ) - return step_result diff --git a/src/ploigos_step_runner/step_implementers/generate_evidence/rekor_sign_evidence.py b/src/ploigos_step_runner/step_implementers/generate_evidence/rekor_sign_evidence.py new file mode 100644 index 00000000..6412bad3 --- /dev/null +++ b/src/ploigos_step_runner/step_implementers/generate_evidence/rekor_sign_evidence.py @@ -0,0 +1,27 @@ +"""Implementation of RekorGeneric in Generate Evidence step. + +artifact_to_sign_uri_config_key is set to `evidence-uri` as +this is the key used to obtain the artifact that needs to be +signed by rekor. + +""" + +from ploigos_step_runner.step_implementers.shared.rekor_sign_generic import RekorSignGeneric + +class RekorSignEvidence(RekorSignGeneric): + """ Rekor implementation specific to generate_evidence + """ + def __init__( # pylint: disable=too-many-arguments + self, + workflow_result, + parent_work_dir_path, + config, + environment=None + ): + super().__init__( + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path, + config=config, + environment=environment, + artifact_to_sign_uri_config_key='evidence-uri' + ) diff --git a/src/ploigos_step_runner/step_implementers/report/__init__.py b/src/ploigos_step_runner/step_implementers/report/__init__.py index 3f5a71d1..43a7f601 100644 --- a/src/ploigos_step_runner/step_implementers/report/__init__.py +++ b/src/ploigos_step_runner/step_implementers/report/__init__.py @@ -3,3 +3,5 @@ from ploigos_step_runner.step_implementers.report.result_artifacts_archive import \ ResultArtifactsArchive +from ploigos_step_runner.step_implementers.report.rekor_sign_report import \ + RekorSignReport diff --git a/src/ploigos_step_runner/step_implementers/report/rekor_sign_report.py b/src/ploigos_step_runner/step_implementers/report/rekor_sign_report.py new file mode 100644 index 00000000..3d7efc51 --- /dev/null +++ b/src/ploigos_step_runner/step_implementers/report/rekor_sign_report.py @@ -0,0 +1,27 @@ +"""Implementation of RekorGeneric in the Report step. + +artifact_to_sign_uri_config_key is set to `results-archive-uri` as +this is the key used to obtain the artifact that needs to be +signed by rekor. + +""" + +from ploigos_step_runner.step_implementers.shared.rekor_sign_generic import RekorSignGeneric + +class RekorSignReport(RekorSignGeneric): + """ Rekor implementation specific to report + """ + def __init__( # pylint: disable=too-many-arguments + self, + workflow_result, + parent_work_dir_path, + config, + environment=None + ): + super().__init__( + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path, + config=config, + environment=environment, + artifact_to_sign_uri_config_key='results-archive-uri' + ) diff --git a/src/ploigos_step_runner/step_implementers/shared/__init__.py b/src/ploigos_step_runner/step_implementers/shared/__init__.py index 58e55c19..365c438f 100644 --- a/src/ploigos_step_runner/step_implementers/shared/__init__.py +++ b/src/ploigos_step_runner/step_implementers/shared/__init__.py @@ -3,3 +3,4 @@ from ploigos_step_runner.step_implementers.shared.maven_generic import MavenGeneric from ploigos_step_runner.step_implementers.shared.openscap_generic import OpenSCAPGeneric +from ploigos_step_runner.step_implementers.shared.rekor_sign_generic import RekorSignGeneric diff --git a/src/ploigos_step_runner/step_implementers/shared/rekor_sign_generic.py b/src/ploigos_step_runner/step_implementers/shared/rekor_sign_generic.py new file mode 100644 index 00000000..ea7195a8 --- /dev/null +++ b/src/ploigos_step_runner/step_implementers/shared/rekor_sign_generic.py @@ -0,0 +1,285 @@ +"""`StepImplementer` for the `automated-governance` step using Rekor. + +Step Configuration +------------------ +Step configuration expected as input to this step. +Could come from: + + * static configuration + * runtime configuration + * previous step results + +Configuration Key | Required? | Default | Description +------------------- |-----------|---------|----------- +`rekor-server-url` | Yes | | URL for Rekor server to upload artifact(s) to. +`signer-pgp-private-key`| Yes | | PGP Private Key used to sign the image + +Result Artifacts +---------------- +Results artifacts output by this step. + +Result Artifact Key | Description +--------------------|------------ +`rekor-uuid` | UUID of Rekor entry created by artifact upload. +`rekor-entry` | DEPRECIATED. Will likely be removed in next release. +""" + +from base64 import b64encode +import json +import sys +from io import StringIO +import sh +from ploigos_step_runner import StepImplementer, StepResult +from ploigos_step_runner.utils.file import base64_encode, get_file_hash,\ + download_source_to_destination +from ploigos_step_runner.utils.io import \ + create_sh_redirect_to_multiple_streams_fn_callback +from ploigos_step_runner.utils.pgp import detach_sign_with_pgp_key +from ploigos_step_runner.utils.pgp import import_pgp_key +from ploigos_step_runner.utils.pgp import export_pgp_public_key + + +DEFAULT_CONFIG = {} + +REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [ + 'rekor-server-url', + 'signer-pgp-private-key' +] +class RekorSignGeneric(StepImplementer): # pylint: disable=too-few-public-methods + """`StepImplementer` for the generic Rekor class. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + workflow_result, + parent_work_dir_path, + config, + artifact_to_sign_uri_config_key, + environment=None + ): + self.__artifact_to_sign_uri_config_key = artifact_to_sign_uri_config_key + super().__init__( + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path, + config=config, + environment=environment + ) + + @property + def artifact_to_sign_uri_config_key(self): + """Getter for the artifact to sign config key. + + Returns + ------- + str + Config key that is used to obtain the artifact + that needs to be signed + """ + return self.__artifact_to_sign_uri_config_key + + @staticmethod + def step_implementer_config_defaults(): + """Getter for the StepImplementer's configuration defaults. + + Returns + ------- + dict + Default values to use for step configuration values. + + Notes + ----- + These are the lowest precedence configuration values. + + """ + return DEFAULT_CONFIG + + @staticmethod + def _required_config_or_result_keys(): + """Getter for step configuration or previous step result artifacts that are required before + running this step. + + See Also + -------- + _validate_required_config_or_previous_step_result_artifact_keys + + Returns + ------- + array_list + Array of configuration keys or previous step result artifacts + that are required before running the step. + """ + return REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS + + def _create_rekor_entry( # pylint: disable=no-self-use + self, + signer_pgp_public_key, + signer_pgp_private_key_fingerprint, + path_to_file, + artifact_to_sign_uri + ): + """Method to generate a rekor entry + + Parameters + ---------- + signer_pgp_public_key: str + Public key obtained from the private key fingerprint + signer_pgp_private_key_fingerprint: str + PGP fingerprint obtained from importing the private key + path_to_file: str + Path to file to be signed + artifact_to_sign_uri: str + URI where artifact was pulled from + + Returns + ------- + dict + Dictionary containing the rekor generated entry. + + + """ + + file_hash = get_file_hash(path_to_file) + sig_file = path_to_file + '.asc' + + base64_public_key = b64encode(signer_pgp_public_key.encode('utf-8')).decode('utf-8') + + detach_sign_with_pgp_key( + output_signature_path=sig_file, + file_to_sign_path=path_to_file, + pgp_private_key_fingerprint=signer_pgp_private_key_fingerprint + ) + + #Base 64 encode the file + + with open(path_to_file, 'rb') as file_bytes: + base64_encoded_data = b64encode(file_bytes.read()).decode('utf-8') + + rekor_entry = { + "kind": "rekord", + "apiVersion": "0.0.1", + "spec": { + "signature": { + "format": "pgp", + "content": base64_encode(sig_file), + "publicKey": { + "content": base64_public_key + } + }, + "data": { + "content": base64_encoded_data, + "hash": { + "algorithm": "sha256", + "value": file_hash + } + }, + "extraData": { + "signed-artifact-uri": artifact_to_sign_uri + + } + } + } + return rekor_entry + + def _upload_to_rekor( + self, + rekor_server, + rekor_entry + ): + """Method to upload a rekor entry to provided rekor server + + Parameters + ---------- + rekor_server: str + URL to rekor server + signer_pgp_private_key_fingerprint: str + PGP fingerprint obtained from importing the private key + path_to_file: str + Path to file to be signed + artifact_to_sign_uri: str + URI where artifact was pulled from + + Returns + ------- + str: + Returns rekor uuid returned from upload command + + """ + + rekor_entry_path = self.write_working_file( + filename='entry.json', + contents=bytes(json.dumps(rekor_entry), 'utf-8') + ) + rekor_upload_stdout_result = StringIO() + rekor_upload_stdout_callback = create_sh_redirect_to_multiple_streams_fn_callback([ + sys.stdout, + rekor_upload_stdout_result + ]) + rekor = sh.rekor( # pylint: disable=no-member + 'upload', + '--rekor_server', + rekor_server, + '--entry', + rekor_entry_path, + _out=rekor_upload_stdout_callback, + _err_to_out=True, + _tee='out' + ) + rekor_uuid = str(rekor).split('/')[-1].strip(' \n') + return rekor_uuid + + def _run_step(self): + """Runs the step implemented by this StepImplementer. + + Returns + ------- + StepResult + Object containing the dictionary results of this step. + """ + step_result = StepResult.from_step_implementer(self) + rekor_server = self.get_value('rekor-server-url') + + work_dir = self.work_dir_path + artifact_to_sign_uri = self.get_value(self.artifact_to_sign_uri_config_key) + #Download artifact that needs to be signed and place at work_dir. + #Path to file is returned as string + path_to_file = download_source_to_destination( + artifact_to_sign_uri, + work_dir) + + # get the pgp private key to sign the image with + signer_pgp_private_key = self.get_value( + 'signer-pgp-private-key' + ) + + # import the PGP key and get the finger print + signer_pgp_private_key_fingerprint = import_pgp_key( + pgp_private_key=signer_pgp_private_key + ) + + signer_pgp_public_key = export_pgp_public_key(signer_pgp_private_key_fingerprint) + + rekor_entry = self._create_rekor_entry( + signer_pgp_public_key, + signer_pgp_private_key_fingerprint, + path_to_file, + artifact_to_sign_uri + ) + + rekor_uuid = self._upload_to_rekor( + rekor_server, + rekor_entry + ) + step_result.add_artifact( + name='rekor-entry', + value=rekor_entry + ) + step_result.add_artifact( + name='rekor-uuid', + value=rekor_uuid + ) + rekor_uri = rekor_server + '/api/v1/log/entries/' + rekor_uuid + step_result.add_artifact( + name='rekor-entry-uri', + value=rekor_uri + ) + return step_result diff --git a/src/ploigos_step_runner/utils/file.py b/src/ploigos_step_runner/utils/file.py index fdd259e3..70d5d599 100644 --- a/src/ploigos_step_runner/utils/file.py +++ b/src/ploigos_step_runner/utils/file.py @@ -60,12 +60,11 @@ def parse_yaml_or_json_file(yaml_or_json_file): return parsed_file -def download_and_decompress_source_to_destination( +def download_source_to_destination( source_uri, destination_dir ): - """Given a source url using a known protocol downloads the file to a given destination - and decompresses it if known compression method. + """Given a source url using a known protocol downloads the file to a given destination. Notes ----- @@ -74,9 +73,6 @@ def download_and_decompress_source_to_destination( * http:// * https:// - Known compression types - * bz2 - Parameters ---------- source_uri : url @@ -88,7 +84,7 @@ def download_and_decompress_source_to_destination( Returns ------- str - Path to the downloaded and decompressed (if needed) file from given source. + Path to the downloaded file from given source. Raises ------ @@ -137,6 +133,47 @@ def download_and_decompress_source_to_destination( "Unexpected error, should have been caught by step validation." f" Source ({source_uri}) must start with known protocol (/|file://|http://|https://)." ) + return destination_path + +def download_and_decompress_source_to_destination( + source_uri, + destination_dir +): + """Given a source url using a known protocol downloads the file to a given destination + and decompresses it if known compression method. + + Notes + ----- + Known source protocols + * file:// + * http:// + * https:// + + Known compression types + * bz2 + + Parameters + ---------- + source_uri : url + URL to a source file using a known protocol to download to destination folder + and decompress if necessary. + destination_dir : path + Path to directory to download and decompress if necessary the source url to. + + Returns + ------- + str + Path to the downloaded and decompressed (if needed) file from given source. + + Raises + ------ + RuntimeError + If error downloading file. + ValueError + If source_uri does not start with file://|http://|https:// + """ + + destination_path = download_source_to_destination(source_uri, destination_dir) # if extension is .bz2, decompress, else assume file is fine as as is file_path, file_extension = os.path.splitext(destination_path) diff --git a/src/ploigos_step_runner/utils/pgp.py b/src/ploigos_step_runner/utils/pgp.py index dc1e8fcf..f514aa9e 100644 --- a/src/ploigos_step_runner/utils/pgp.py +++ b/src/ploigos_step_runner/utils/pgp.py @@ -72,7 +72,7 @@ def import_pgp_key(pgp_private_key): # grp:::::::::A483EE079EC1D58A954E3AAF3BCC61EDD7596BF0: gpg_regex = re.compile(r"^fpr:+([^:]+):$", re.MULTILINE) - print("Import PGP private key to sign container image(s) with") + print("Import PGP private key to sign artifacts with") try: # import the key @@ -109,7 +109,7 @@ def import_pgp_key(pgp_private_key): pgp_private_key_fingerprint = gpg_imported_pgp_private_key_fingerprints[0] print( - "Imported PGP private key to sign container image(s) with: " + "Imported PGP private key to sign artifacts with: " f"fingerprint='{pgp_private_key_fingerprint}'" ) except sh.ErrorReturnCode as error: @@ -118,3 +118,41 @@ def import_pgp_key(pgp_private_key): ) from error return pgp_private_key_fingerprint + +def export_pgp_public_key(pgp_private_key_fingerprint): + """Exports a PGP public key given a private key fingerprint. + + Parameters + ---------- + pgp_private_key_fingerprint : str + PGP fingerprint. + + Returns + ------- + str + Public key from the private key fingerprint. + + Raises + ------ + RuntimeError + If error getting exported PGP public key + """ + try: + gpg_export_stdout_result = StringIO() + + sh.gpg( # pylint: disable=no-member + '--armor', + '--export', + pgp_private_key_fingerprint, + _out=gpg_export_stdout_result, + _err_to_out=True, + _tee='out' + ) + + gpg_public_key = gpg_export_stdout_result.getvalue() + except sh.ErrorReturnCode as error: + raise RuntimeError( + f"Error exporting pgp public key: {error}" + ) from error + + return gpg_public_key diff --git a/tests/step_implementers/automated_governance/test_rekor.py b/tests/step_implementers/automated_governance/test_rekor.py deleted file mode 100644 index dd5e603b..00000000 --- a/tests/step_implementers/automated_governance/test_rekor.py +++ /dev/null @@ -1,217 +0,0 @@ -import os -from io import IOBase -from pathlib import Path -from unittest.mock import patch - -import sh -from ploigos_step_runner import StepResult, WorkflowResult -from ploigos_step_runner.step_implementers.automated_governance import Rekor -from testfixtures import TempDirectory -from tests.helpers.base_step_implementer_test_case import \ - BaseStepImplementerTestCase -from tests.helpers.test_utils import Any - - -class TestStepImplementerAutomatedGovernanceRekor(BaseStepImplementerTestCase): - TEST_REKOR_ENTRY = { - "kind": "rekord", - "apiVersion": "0.0.1", - "spec": { - "signature": { - "format": "pgp", - "content": "", - "publicKey": { - "content": "" - } - }, - "data": { - "content": "ewogICAgInN0ZXAtcnVubmVyLXJlc3VsdHMiOiB7fQp9", - "hash": { - "algorithm": "sha256", - "value": "e2162714a1c0e2f6a362f0596514a8d37458db058cc82a728c3717c9275b1d81" - } - }, - "extraData": "ewogICAgInN0ZXAtcnVubmVyLXJlc3VsdHMiOiB7fQp9" - } - } - - TEST_REKOR_UUID = 'b08416d417acdb0610d4a030d8f697f9d0a718024681a00fa0b9ba67072a38b5' - TEST_REKOR_SERVER = 'http://rekor.apps.tssc.rht-set.com' - TEST_signer_pgp_private_key_user = 'tssc-python-package-tests' - - def create_step_implementer( - self, - step_config={}, - workflow_result=None, - parent_work_dir_path='' - ): - return self.create_given_step_implementer( - step_implementer=Rekor, - step_config=step_config, - step_name='automated-governance', - implementer='Rekor', - workflow_result=workflow_result, - parent_work_dir_path=parent_work_dir_path - ) - - def test__required_config_or_result_keys(self): - required_keys = Rekor._required_config_or_result_keys() - expected_required_keys = [ - 'rekor-server-url', - 'signer-pgp-public-key-path', - 'signer-pgp-private-key-user' - ] - self.assertEqual(required_keys, expected_required_keys) - - def test_create_rekor_entry(self): - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - signer_pgp_public_key_path = os.path.join( - os.path.dirname(__file__), - '../../helpers','files', - 'ploigos-step-runner-tests-public.key' - ) - - try: - sh.gpg('--import', signer_pgp_public_key_path) - except sh.ErrorReturnCode_2: - print("Key already imported.") - - # Write empty WorkflowResult to json file to use as Rekor extra data - extra_data_file = os.path.join(parent_work_dir_path, 'automated-governance', 'automated-governance.json') - extra_data_file_path = Path(extra_data_file) - WorkflowResult().write_results_to_json_file(extra_data_file_path) - - sig_file = extra_data_file + '.asc' - sig_file_path = Path(sig_file) - sig_file_path.touch() - - result = Rekor.create_rekor_entry( - signer_pgp_public_key_path=signer_pgp_public_key_path, - signer_pgp_private_key_user=TestStepImplementerAutomatedGovernanceRekor.TEST_signer_pgp_private_key_user, - extra_data_file=extra_data_file - ) - self.assertEqual(result['spec']['data']['hash']['value'], TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_ENTRY['spec']['data']['hash']['value']) - self.assertEqual(result['spec']['extraData'], TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_ENTRY['spec']['extraData']) - - @patch.object(Rekor, 'create_rekor_entry') - @patch('sh.rekor', create=True) - def test_upload_to_rekor(self, rekor_mock, create_mock): - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - signer_pgp_public_key_path = os.path.join( - os.path.dirname(__file__), - '../../helpers','files', - 'ploigos-step-runner-tests-public.key' - ) - - step_config = { - 'rekor-server-url': TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_SERVER, - 'signer-pgp-public-key-path': signer_pgp_public_key_path, - 'signer-pgp-private-key-user': TestStepImplementerAutomatedGovernanceRekor.TEST_signer_pgp_private_key_user - } - - step_implementer = self.create_step_implementer( - step_config=step_config, - parent_work_dir_path=parent_work_dir_path, - ) - - extra_data_file = os.path.join(parent_work_dir_path, 'automated-governance', 'automated-governance.json') - extra_data_file_path = Path(extra_data_file) - WorkflowResult().write_results_to_json_file(extra_data_file_path) - rekor_entry_path_name = os.path.join(parent_work_dir_path, 'automated-governance', 'entry.json') - - def create_mock_side_effect(signer_pgp_public_key_path, signer_pgp_private_key_user, extra_data_file): - return TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_ENTRY - - def rekor_mock_side_effect(*args, **kwargs): - return 'Created entry at: ' + args[2]+ '/api/v1/log/entries/' + TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_UUID - - create_mock.side_effect = create_mock_side_effect - rekor_mock.side_effect = rekor_mock_side_effect - - rekor_entry_path = Path(rekor_entry_path_name) - rekor_entry_path.touch() - - result_entry, result_uuid = step_implementer.upload_to_rekor( - rekor_server=TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_SERVER, - extra_data_file=extra_data_file, - signer_pgp_public_key_path=signer_pgp_public_key_path, - signer_pgp_private_key_user=TestStepImplementerAutomatedGovernanceRekor.TEST_signer_pgp_private_key_user - ) - - rekor_mock.assert_called_once_with( - 'upload', - '--rekor_server', - TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_SERVER, - '--entry', - rekor_entry_path_name, - _out=Any(IOBase), - _err_to_out=True, - _tee='out' - ) - self.assertEqual(result_entry, TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_ENTRY) - self.assertEqual(result_uuid, TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_UUID) - - @patch.object(Rekor, 'upload_to_rekor') - def test__run_step(self, upload_mock): - """Testing extra_data in rekor_entry - """ - with TempDirectory() as temp_dir: - parent_work_dir_path = os.path.join(temp_dir.path, 'working') - signer_pgp_public_key_path = os.path.join( - os.path.dirname(__file__), - '../../helpers','files', - 'ploigos-step-runner-tests-public.key' - ) - - step_config = { - 'rekor-server-url': TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_SERVER, - 'signer-pgp-public-key-path': signer_pgp_public_key_path, - 'signer-pgp-private-key-user': TestStepImplementerAutomatedGovernanceRekor.TEST_signer_pgp_private_key_user - } - - step_implementer = self.create_step_implementer( - step_config=step_config, - parent_work_dir_path=parent_work_dir_path, - ) - - expected_step_result = StepResult( - step_name='automated-governance', - sub_step_name='Rekor', - sub_step_implementer_name='Rekor' - ) - - expected_step_result.add_artifact( - name='rekor-entry', - value=TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_ENTRY - ) - expected_step_result.add_artifact( - name='rekor-uuid', - value=TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_UUID - ) - - def upload_mock_side_effect( - rekor_server, - extra_data_file, - signer_pgp_public_key_path, - signer_pgp_private_key_user - ): - return TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_ENTRY, TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_UUID - - upload_mock.side_effect = upload_mock_side_effect - - extra_data_file = os.path.join( - step_implementer.work_dir_path, - 'automated-governance.json' - ) - - result = step_implementer._run_step() - upload_mock.assert_called_once_with( - rekor_server=TestStepImplementerAutomatedGovernanceRekor.TEST_REKOR_SERVER, - extra_data_file=extra_data_file, - signer_pgp_public_key_path=signer_pgp_public_key_path, - signer_pgp_private_key_user=TestStepImplementerAutomatedGovernanceRekor.TEST_signer_pgp_private_key_user - ) - - self.assertEqual(result.get_step_result_dict(), expected_step_result.get_step_result_dict()) diff --git a/tests/step_implementers/generate_evidence/test_generate_evidence.py b/tests/step_implementers/generate_evidence/test_generate_evidence.py index 57be1513..c116cda8 100644 --- a/tests/step_implementers/generate_evidence/test_generate_evidence.py +++ b/tests/step_implementers/generate_evidence/test_generate_evidence.py @@ -129,7 +129,7 @@ class TestStepImplementerGenerateEvidence_run_step(TestStepImplementerGenerateEv GenerateEvidence, '_GenerateEvidence__gather_evidence', return_value = None - ) + ) def test__run_step_pass_no_evidence(self, generate_evidence_mock): step_config = { 'organization': 'test-ORG', @@ -151,7 +151,7 @@ def test__run_step_pass_no_evidence(self, generate_evidence_mock): sub_step_implementer_name='GenerateEvidence' ) - + expected_step_result.add_artifact( name='result-generate-evidence', value='No evidence to generate.', @@ -210,7 +210,7 @@ def test__run_step_pass_with_evidence(self, generate_evidence_mock): GenerateEvidence, '_GenerateEvidence__gather_evidence', return_value = TestStepImplementerGenerateEvidenceBase.GATHER_EVIDENCE_MOCK_DICT - ) + ) def test_run_step_write_results_to_json_file(self, gather_evidence_mock): expected_evidence = self.GATHER_EVIDENCE_MOCK_JSON @@ -232,7 +232,7 @@ def test_run_step_write_results_to_json_file(self, gather_evidence_mock): # run the step step_result = step_implementer._run_step() - + json_file_path = step_result.get_artifact_value("evidence-path") with open(json_file_path, 'r') as actual_json_file: @@ -245,7 +245,7 @@ def test_run_step_write_results_to_json_file(self, gather_evidence_mock): GenerateEvidence, '_GenerateEvidence__gather_evidence', return_value = TestStepImplementerGenerateEvidenceBase.GATHER_EVIDENCE_MOCK_DICT - ) + ) def test__run_step_upload_to_file(self, gather_evidence_mock, upload_file_mock): with TempDirectory() as temp_dir: parent_work_dir_path = os.path.join(temp_dir.path, 'working') @@ -308,10 +308,10 @@ def test__run_step_upload_to_file(self, gather_evidence_mock, upload_file_mock): GenerateEvidence, '_GenerateEvidence__gather_evidence', return_value = TestStepImplementerGenerateEvidenceBase.GATHER_EVIDENCE_MOCK_DICT - ) + ) def test__upload_to_remote_with_auth(self, gather_evidence_mock, upload_file_mock): with TempDirectory() as temp_dir: - + parent_work_dir_path = os.path.join(temp_dir.path, 'working') step_config = { @@ -375,7 +375,7 @@ def test__upload_to_remote_with_auth(self, gather_evidence_mock, upload_file_moc GenerateEvidence, '_GenerateEvidence__gather_evidence', return_value = TestStepImplementerGenerateEvidenceBase.GATHER_EVIDENCE_MOCK_DICT - ) + ) def test__run_step_pass_upload_to_remote_with_auth_failure(self, gather_evidence_mock, upload_file_mock): with TempDirectory() as temp_dir: parent_work_dir_path = os.path.join(temp_dir.path, 'working') @@ -421,7 +421,7 @@ def test__run_step_pass_upload_to_remote_with_auth_failure(self, gather_evidence username='test-user', password='test-pass' ) - + class TestStepImplementerGenerateEvidence_gather_evidence(TestStepImplementerGenerateEvidenceBase): @@ -439,7 +439,7 @@ def __setup_evidence(self, parent_work_dir_path, evidence=True, environment=None sub_step_implementer_name='test-sub-step-implementer', environment=environment ) - + step_result.add_evidence( name='test-evidence', value="test-value", @@ -454,7 +454,7 @@ def __setup_evidence(self, parent_work_dir_path, evidence=True, environment=None workflow_result = WorkflowResult() workflow_result.add_step_result(step_result) - + step_implementer = self.create_step_implementer( step_config=step_config, parent_work_dir_path=parent_work_dir_path, @@ -465,11 +465,11 @@ def __setup_evidence(self, parent_work_dir_path, evidence=True, environment=None def test__gather_evidence(self): with TempDirectory() as temp_dir: parent_work_dir_path = os.path.join(temp_dir.path, 'working') - + step_implementer = self.__setup_evidence(parent_work_dir_path) gathered_evidence = step_implementer._GenerateEvidence__gather_evidence() - + expected_gathered_evidence = TestStepImplementerGenerateEvidenceBase.GATHER_EVIDENCE_MOCK_DICT self.assertEqual(gathered_evidence, expected_gathered_evidence) @@ -496,11 +496,11 @@ def test___gather_evidence_no_results(self): def test__gather_evidence_with_environment(self): with TempDirectory() as temp_dir: parent_work_dir_path = os.path.join(temp_dir.path, 'working') - + step_implementer = self.__setup_evidence(parent_work_dir_path, environment='foo') gathered_evidence = step_implementer._GenerateEvidence__gather_evidence() - + expected_gathered_evidence = TestStepImplementerGenerateEvidenceBase.GATHER_EVIDENCE_MOCK_DICT_WITH_ENVIRONMENT - self.assertEqual(gathered_evidence, expected_gathered_evidence) \ No newline at end of file + self.assertEqual(gathered_evidence, expected_gathered_evidence) diff --git a/tests/step_implementers/generate_evidence/test_rekor_evidence.py b/tests/step_implementers/generate_evidence/test_rekor_evidence.py new file mode 100644 index 00000000..562fafa5 --- /dev/null +++ b/tests/step_implementers/generate_evidence/test_rekor_evidence.py @@ -0,0 +1,41 @@ + +import os +from ploigos_step_runner.step_implementers.generate_evidence.rekor_sign_evidence import RekorSignEvidence +from testfixtures import TempDirectory +from tests.step_implementers.shared.test_rekor_sign_generic import \ + TestStepImplementerSharedRekorSignGeneric + +class TestStepImplementerRekorEvidence(TestStepImplementerSharedRekorSignGeneric): + def create_step_implementer_rekor_evidence( + self, + step_config={}, + workflow_result=None, + parent_work_dir_path='', + ): + + step_implementer = self.create_given_step_implementer( + step_implementer=RekorSignEvidence, + step_config=step_config, + step_name='automated-governance', + implementer='RekorSignEvidence', + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path + ) + return step_implementer + + def test__validate_rekor_constructor(self): + with TempDirectory() as temp_dir: + parent_work_dir_path = os.path.join(temp_dir.path, 'working') + step_config = { + 'rekor-server-url': self.TEST_REKOR_SERVER_URL, + 'signer-pgp-private-key': self.TEST_PGP_SIGNER_PRIVATE_KEY + } + step_implementer = self.create_step_implementer_rekor_evidence( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path + ) + + artifact_to_sign_uri_config_key = step_implementer.artifact_to_sign_uri_config_key + + self.assertEqual(artifact_to_sign_uri_config_key, 'evidence-uri') + diff --git a/tests/step_implementers/report/test_rekor_report.py b/tests/step_implementers/report/test_rekor_report.py new file mode 100644 index 00000000..326de82c --- /dev/null +++ b/tests/step_implementers/report/test_rekor_report.py @@ -0,0 +1,41 @@ + +import os +from ploigos_step_runner.step_implementers.report.rekor_sign_report import RekorSignReport +from testfixtures import TempDirectory +from tests.step_implementers.shared.test_rekor_sign_generic import \ + TestStepImplementerSharedRekorSignGeneric + +class TestStepImplementerRekorReport(TestStepImplementerSharedRekorSignGeneric): + def create_step_implementer_rekor_evidence( + self, + step_config={}, + workflow_result=None, + parent_work_dir_path='', + ): + + step_implementer = self.create_given_step_implementer( + step_implementer=RekorSignReport, + step_config=step_config, + step_name='report', + implementer='RekorSignReport', + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path + ) + return step_implementer + + def test__validate_rekor_constructor(self): + with TempDirectory() as temp_dir: + parent_work_dir_path = os.path.join(temp_dir.path, 'working') + step_config = { + 'rekor-server-url': self.TEST_REKOR_SERVER_URL, + 'signer-pgp-private-key': self.TEST_PGP_SIGNER_PRIVATE_KEY + } + step_implementer = self.create_step_implementer_rekor_evidence( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path + ) + + artifact_to_sign_uri_config_key = step_implementer.artifact_to_sign_uri_config_key + + self.assertEqual(artifact_to_sign_uri_config_key, 'results-archive-uri') + diff --git a/tests/step_implementers/shared/test_rekor_sign_generic.py b/tests/step_implementers/shared/test_rekor_sign_generic.py new file mode 100644 index 00000000..55050a44 --- /dev/null +++ b/tests/step_implementers/shared/test_rekor_sign_generic.py @@ -0,0 +1,327 @@ +import os +from io import IOBase +from pathlib import Path +from unittest.mock import patch + +import sh +from shutil import copy +from ploigos_step_runner import StepResult, WorkflowResult +from ploigos_step_runner.config.config import Config +from ploigos_step_runner.step_implementers.shared import RekorSignGeneric +from testfixtures import TempDirectory +from tests.helpers.base_step_implementer_test_case import \ + BaseStepImplementerTestCase +from tests.helpers.test_utils import Any +from ploigos_step_runner.utils.pgp import detach_sign_with_pgp_key +from ploigos_step_runner.utils.pgp import import_pgp_key +from ploigos_step_runner.utils.pgp import export_pgp_public_key + + +class TestStepImplementerSharedRekorSignGeneric(BaseStepImplementerTestCase): + TEST_REKOR_ENTRY = { + "kind": "rekord", + "apiVersion": "0.0.1", + "spec": { + "signature": { + "format": "pgp", + "content": "", + "publicKey": { + "content": "" + } + }, + "data": { + "content": "ewogICAgInN0ZXAtcnVubmVyLXJlc3VsdHMiOiB7fQp9", + "hash": { + "algorithm": "sha256", + "value": "7e9bb92be27b8897c4be4e2b5a185a0823bd4b1f566ef81312a96bbc278cf716" + } + }, + "extraData": { + "signed-artifact-uri": "http://foo.bar/artifact_to_sign" + } + } + } + + TEST_REKOR_UUID = 'b08416d417acdb0610d4a030d8f697f9d0a718024681a00fa0b9ba67072a38b5' + TEST_REKOR_SERVER_URL = 'http://rekor.apps.tssc.rht-set.com' + TEST_ARTIFACT_TO_SIGN_URI_CONFIG_KEY = 'test-key' + TEST_ARTIFACT_TO_SIGN_URI = 'http://foo.bar/artifact_to_sign' + TEST_PGP_SIGNER_PRIVATE_KEY = "" + TEST_REKOR_ENTRY_URI='http://rekor.apps.tssc.rht-set.com/api/v1/log/entries/' + TEST_REKOR_UUID + + TEST_FAKE_FINGERPRINT = 'DEB72DAF711EF3586266C98D9D65E8ECADD724B4' + + TEST_FAKE_PUBLIC_KEY = '''\ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQENBGDKRgEBCACyVB9ZZodlYBf5Fw5iWCohDyjFkypQjRFVaQz8XbvOzrU0reNy +fQpXFzEiNcGxhUHDM538Zv/adIOWMj3EvFTEuCY9I5DSfi4A8AfjSxLMUVSJ85f9 +rmHWKoCKoadF7FduKWnGEUk4woVNmhoVqpNMbV6IczNX2vmr7omx4/KgLu4k8WeW +3C0ItJES1fTz9epfynpndXZ9+Iiqiyx8d2cWcVtodv60bJkVIGdLhvyn3/vfSFvf +iSkw4FcKjpXt0k7Kd0e2mv5PHqEYWtkN41p6hDClu3i4EgnFwRjJK9H7lUPcttIJ +hS44e5HhLrihPVK91HZ5Eh0k5jpgQpkNV+ntABEBAAG0O0pvaG4gVGhvbXBraW5z +IChSZWQgSGF0KSAoTm8gY29tbWVudCkgPGp0aG9tcGtpQHJlZGhhdC5jb20+iQFO +BBMBCAA4FiEE3rctr3Ee81hiZsmNnWXo7K3XJLQFAmDKRgECGwMFCwkIBwIGFQoJ +CAsCBBYCAwECHgECF4AACgkQnWXo7K3XJLQANAgAjp5GQALhmhaxLXQJs9tmLzgB +VLOTn6r3z6emlIjjxZYlwLCV+xXZ12Igg+1L5IJo0Nrk2xmcAgp+3WScx29xtkbW +j44jt7eghH4clkSau8RTJWQlyMyzmLVGEDgqBr+YICcnV85NS31ILigvMorKgszP +RVJpG9G+GQRNm7L17sQUSYItX/4ZAbzBr/RhD4IRkgwFxlgrsVujcGN4UMQ2f+uy +b7jY7Gx4o3lKZc+2rFAp6Z9xkChdl69UzJA9Cb7MGNPDE2rFItqXQRUX+4bVOWjy +ffPOuPFqVZJ4989lAfNFh5zitVs2aYGMuWbrRp/AMqBY+2AWw+78/Gv2b6iPfbkB +DQRgykYBAQgAxYk3SS3wk3vGjGF8qAQ6AnMnU2XpqlGt9uTJnChDyzhb964Yxx6+ +vN3EqChoAROjyaUQN2GzmUezm1kNDa9sw4KE+a50d5nJqEniz4UvUUTzi9FluTb5 +EkRGr1zP0rd1PPWvMB9gil2lie4d7O9iwYT5KUDhiEB+LH8GvI0YENcYDN46CBvG +Qb89qhttXYp22DIll3mQPoabdmWFYD1xYRGoEwP/Zjkx+9MPOASStD4oaULkHoDn +qNMQNrVEY9YQ6JrG55a2XTLEcBaWmLyGnytRjVSs0xp8fzZCwTDgCkcoyAnNrQ7k +wiJJV3fvHP5i6meEdPdRSR4Bwc173ZzUdQARAQABiQE2BBgBCAAgFiEE3rctr3Ee +81hiZsmNnWXo7K3XJLQFAmDKRgECGwwACgkQnWXo7K3XJLS24AgAoGke/g08/FEz +Fml1e81zYEB6+GyggQS365JA16Ev+nvOQbjEk55PvZFfNMbsI0T7IamweX3BzlPf +Sb8HZsjJp0SNlhu7I1P4X3nMa2OofSEE8qd3ptEmAWqewhW+DqGDA23ccAO+VDSj +UvF+nhrYFM/BJY/SE5QyDcDRuJUG3hEAN0FPGSEbWZoAgPpQKdmwLwHeTtdadvAH +NNqw3gdbZjlF+RQbggHc4sXtK/H7wk3SU501BgP10GkfsFUekGiAlkAmYAO6gUnQ +1cfRMdTTzuGVc6j/8VshK1aYRR5A3g5rP4ZbNReqQddZmB20qe4ekkFKbRvHX/Em +B8pBNt1QOA== +=u6VF +-----END PGP PUBLIC KEY BLOCK----- +''' + + TEST_FAKE_PRIVATE_KEY = ''' + -----BEGIN RSA PRIVATE KEY----- + MIICXAIBAAKBgQCqGKukO1De7zhZj6+H0qtjTkVxwTCpvKe4eCZ0FPqri0cb2JZfXJ/DgYSF6vUp + wmJG8wVQZKjeGcjDOL5UlsuusFncCzWBQ7RKNUSesmQRMSGkVb1/3j+skZ6UtW+5u09lHNsj6tQ5 + 1s1SPrCBkedbNf0Tp0GbMJDyR4e9T04ZZwIDAQABAoGAFijko56+qGyN8M0RVyaRAXz++xTqHBLh + 3tx4VgMtrQ+WEgCjhoTwo23KMBAuJGSYnRmoBZM3lMfTKevIkAidPExvYCdm5dYq3XToLkkLv5L2 + pIIVOFMDG+KESnAFV7l2c+cnzRMW0+b6f8mR1CJzZuxVLL6Q02fvLi55/mbSYxECQQDeAw6fiIQX + GukBI4eMZZt4nscy2o12KyYner3VpoeE+Np2q+Z3pvAMd/aNzQ/W9WaI+NRfcxUJrmfPwIGm63il + AkEAxCL5HQb2bQr4ByorcMWm/hEP2MZzROV73yF41hPsRC9m66KrheO9HPTJuo3/9s5p+sqGxOlF + L0NDt4SkosjgGwJAFklyR1uZ/wPJjj611cdBcztlPdqoxssQGnh85BzCj/u3WqBpE2vjvyyvyI5k + X6zk7S0ljKtt2jny2+00VsBerQJBAJGC1Mg5Oydo5NwD6BiROrPxGo2bpTbu/fhrT8ebHkTz2epl + U9VQQSQzY1oZMVX8i1m5WUTLPz2yLJIBQVdXqhMCQBGoiuSoSjafUhV7i1cEGpb88h5NBYZzWXGZ + 37sJ5QsW+sJyoNde3xH8vdXhzU7eT82D6X/scw9RZz+/6rCJ4p0= + -----END RSA PRIVATE KEY-----''' + + def create_step_implementer( + self, + step_config={}, + workflow_result=None, + parent_work_dir_path='', + artifact_to_sign_uri_config_key=None + ): + + step_implementer = self.create_given_step_implementer_rekor_sign_generic( + step_implementer=RekorSignGeneric, + step_config=step_config, + step_name='automated-governance', + implementer='RekorSignGeneric', + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path, + artifact_to_sign_uri_config_key=artifact_to_sign_uri_config_key + ) + return step_implementer + + def create_given_step_implementer_rekor_sign_generic( + self, + step_implementer, + step_config={}, + step_name='', + environment=None, + implementer='', + workflow_result=None, + parent_work_dir_path='', + artifact_to_sign_uri_config_key=None + ): + config = Config({ + Config.CONFIG_KEY: { + step_name: [ + { + 'implementer': implementer, + 'config': step_config + } + ] + + } + }) + step_config = config.get_step_config(step_name) + sub_step_config = step_config.get_sub_step(implementer) + + if not workflow_result: + workflow_result = WorkflowResult() + + step_implementer = step_implementer( + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path, + config=sub_step_config, + artifact_to_sign_uri_config_key=artifact_to_sign_uri_config_key, + environment=environment + ) + return step_implementer + + def test__required_config_or_result_keys(self): + required_keys = RekorSignGeneric._required_config_or_result_keys() + expected_required_keys = [ + 'rekor-server-url', + 'signer-pgp-private-key' + ] + self.assertEqual(required_keys, expected_required_keys) + + @patch('ploigos_step_runner.step_implementers.shared.rekor_sign_generic.detach_sign_with_pgp_key') + @patch('ploigos_step_runner.step_implementers.shared.rekor_sign_generic.base64_encode') + def test_create_rekor_entry(self, + base64_encode_mock, + detach_sign_with_pgp_key_mock): + with TempDirectory() as temp_dir: + signer_pgp_public_key_path = os.path.join( + os.path.dirname(__file__), + '../../helpers','files', + 'ploigos-step-runner-tests-public.key' + ) + + #Copy key file to temp directory since _create_rekor_entry outputs a detached + #signature to a new file. + tmp_key_path = os.path.join(temp_dir.path, 'public.key') + copy(signer_pgp_public_key_path, tmp_key_path) + + signer_pgp_private_key_fingerprint = self.TEST_FAKE_FINGERPRINT + + signer_pgp_public_key = self.TEST_FAKE_PUBLIC_KEY + + base64_encode_mock.return_value = "mock64" + + result = RekorSignGeneric._create_rekor_entry(self, + signer_pgp_public_key, + signer_pgp_private_key_fingerprint, + path_to_file=tmp_key_path, + artifact_to_sign_uri=self.TEST_ARTIFACT_TO_SIGN_URI + ) + + self.assertEqual(result['spec']['data']['hash']['value'], self.TEST_REKOR_ENTRY['spec']['data']['hash']['value']) + self.assertEqual(result['spec']['extraData'], self.TEST_REKOR_ENTRY['spec']['extraData']) + + detach_sign_with_pgp_key_mock.assert_called_once_with( + output_signature_path = tmp_key_path + '.asc', + file_to_sign_path = tmp_key_path, + pgp_private_key_fingerprint=signer_pgp_private_key_fingerprint + ) + + base64_encode_mock.assert_any_call( + tmp_key_path + '.asc', + ) + + + @patch.object(RekorSignGeneric, '_create_rekor_entry') + @patch('sh.rekor', create=True) + def test_upload_to_rekor(self, rekor_mock, create_mock): + with TempDirectory() as temp_dir: + parent_work_dir_path = os.path.join(temp_dir.path, 'working') + signer_pgp_public_key_path = os.path.join( + os.path.dirname(__file__), + '../../helpers','files', + 'ploigos-step-runner-tests-public.key' + ) + + step_config = { + 'rekor-server-url': self.TEST_REKOR_SERVER_URL, + 'signer-pgp-private-key': self.TEST_PGP_SIGNER_PRIVATE_KEY + } + + step_implementer = self.create_step_implementer( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path, + artifact_to_sign_uri_config_key=self.TEST_ARTIFACT_TO_SIGN_URI + ) + + artifact_data_file = os.path.join(parent_work_dir_path, 'automated-governance', 'automated-governance.json') + artifact_data_file_path = Path(artifact_data_file) + WorkflowResult().write_results_to_json_file(artifact_data_file_path) + rekor_entry_path_name = os.path.join(parent_work_dir_path, 'automated-governance', 'entry.json') + + def create_mock_side_effect(signer_pgp_public_key_path, signer_pgp_private_key_user, extra_data_file): + return self.TEST_REKOR_ENTRY + + def rekor_mock_side_effect(*args, **kwargs): + return 'Created entry at: ' + args[2]+ '/api/v1/log/entries/' + self.TEST_REKOR_UUID + + create_mock.side_effect = create_mock_side_effect + rekor_mock.side_effect = rekor_mock_side_effect + + rekor_entry_path = Path(rekor_entry_path_name) + rekor_entry_path.touch() + + result_uuid = step_implementer._upload_to_rekor( + rekor_server=self.TEST_REKOR_SERVER_URL, + rekor_entry=self.TEST_REKOR_ENTRY + ) + + rekor_mock.assert_called_once_with( + 'upload', + '--rekor_server', + self.TEST_REKOR_SERVER_URL, + '--entry', + rekor_entry_path_name, + _out=Any(IOBase), + _err_to_out=True, + _tee='out' + ) + self.assertEqual(result_uuid, self.TEST_REKOR_UUID) + @patch('ploigos_step_runner.step_implementers.shared.rekor_sign_generic.export_pgp_public_key') + @patch('ploigos_step_runner.step_implementers.shared.rekor_sign_generic.import_pgp_key') + @patch('ploigos_step_runner.step_implementers.shared.rekor_sign_generic.download_source_to_destination') + @patch.object(RekorSignGeneric, '_upload_to_rekor') + @patch.object(RekorSignGeneric, '_create_rekor_entry') + def test__run_step(self, create_mock, + upload_mock, + download_source_to_destination_mock, + import_pgp_key_mock, + export_pgp_public_key_mock): + + with TempDirectory() as temp_dir: + parent_work_dir_path = os.path.join(temp_dir.path, 'working') + signer_pgp_public_key_path = os.path.join( + os.path.dirname(__file__), + '../../helpers','files', + 'ploigos-step-runner-tests-public.key' + ) + + step_config = { + 'rekor-server-url': self.TEST_REKOR_SERVER_URL, + 'signer-pgp-private-key': self.TEST_FAKE_PRIVATE_KEY, + self.TEST_ARTIFACT_TO_SIGN_URI_CONFIG_KEY: self.TEST_ARTIFACT_TO_SIGN_URI + } + + step_implementer = self.create_step_implementer( + step_config=step_config, + parent_work_dir_path=parent_work_dir_path, + artifact_to_sign_uri_config_key=self.TEST_ARTIFACT_TO_SIGN_URI_CONFIG_KEY + ) + + expected_step_result = StepResult( + step_name='automated-governance', + sub_step_name='RekorSignGeneric', + sub_step_implementer_name='RekorSignGeneric' + ) + + expected_step_result.add_artifact( + name='rekor-entry', + value=self.TEST_REKOR_ENTRY + ) + expected_step_result.add_artifact( + name='rekor-uuid', + value=self.TEST_REKOR_UUID + ) + rekor_uri = self.TEST_REKOR_ENTRY_URI + expected_step_result.add_artifact( + name='rekor-entry-uri', + value=rekor_uri + + ) + + download_source_to_destination_mock.return_value = signer_pgp_public_key_path + import_pgp_key_mock.return_value = self.TEST_FAKE_FINGERPRINT + export_pgp_public_key_mock.return_value = self.TEST_FAKE_PUBLIC_KEY + + create_mock.return_value = self.TEST_REKOR_ENTRY + upload_mock.return_value = self.TEST_REKOR_UUID + + result = step_implementer._run_step() + + self.assertEqual(result, expected_step_result) diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index 3d409d38..0bd623c2 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -6,6 +6,7 @@ from ploigos_step_runner.utils.file import ( base64_encode, create_parent_dir, + download_source_to_destination, download_and_decompress_source_to_destination, get_file_hash, parse_yaml_or_json_file, upload_file) from testfixtures import TempDirectory @@ -143,41 +144,82 @@ def test_https_bad_uri(self): destination_dir=test_dir.path ) - def test_create_parent_dir(self): +class TestDownloadSourceToDestination(BaseTestCase): + def test_https_xml(self): with TempDirectory() as test_dir: - file_path = os.path.join(test_dir.path, 'hello/world/does/not/exit/foo.yml') + destination_path = download_source_to_destination( + source_uri="https://www.redhat.com/security/data/cvrf/2020/cvrf-rhba-2020-0017.xml", + destination_dir=test_dir.path + ) - self.assertFalse(os.path.exists(file_path)) - self.assertFalse(os.path.exists(os.path.dirname(file_path))) + self.assertIsNotNone(destination_path) + self.assertRegex(destination_path, rf'{test_dir.path}/cvrf-rhba-2020-0017.xml$') + with open(destination_path) as downloaded_file: + self.assertTrue(downloaded_file.read()) - create_parent_dir(file_path) - self.assertFalse(os.path.exists(file_path)) - self.assertTrue(os.path.exists(os.path.dirname(file_path))) + def test_local_file_download_file_prefix(self): + sample_file_path = os.path.join( + os.path.dirname(__file__), + 'files', + 'cvrf-rhba-2020-0017.xml' + ) - def test_base64_encode(self): with TempDirectory() as test_dir: - sample_file_path = os.path.join( - os.path.dirname(__file__), - 'files', - 'sample.txt' + destination_path = download_source_to_destination( + source_uri=f"file://{sample_file_path}", + destination_dir=test_dir.path ) - result = base64_encode(sample_file_path) - self.assertEqual(result, - 'c2FtcGxlIHRleHQgZmlsZQ==' - ) + self.assertIsNotNone(destination_path) + self.assertRegex(destination_path, rf'{test_dir.path}/cvrf-rhba-2020-0017.xml$') + with open(destination_path) as downloaded_file, open(sample_file_path) as sample_file: + downloaded_file_contents = downloaded_file.read() + self.assertTrue(downloaded_file_contents) + self.assertEqual(downloaded_file_contents, sample_file.read()) + + def test_local_file_download_forward_slash_prefix(self): + sample_file_path = os.path.join( + os.path.dirname(__file__), + 'files', + 'cvrf-rhba-2020-0017.xml' + ) - def test_get_file_hash(self): with TempDirectory() as test_dir: - sample_file_path = os.path.join( - os.path.dirname(__file__), - 'files', - 'sample.txt' + destination_path = download_source_to_destination( + source_uri=f"{sample_file_path}", + destination_dir=test_dir.path ) - result = get_file_hash(sample_file_path) - self.assertEqual(result, '09daa01246aa5ee9c29f64f644627a0ea83247857dfea2665689e26b166eef47') + self.assertIsNotNone(destination_path) + self.assertRegex(destination_path, rf'{test_dir.path}/cvrf-rhba-2020-0017.xml$') + with open(destination_path) as downloaded_file, open(sample_file_path) as sample_file: + downloaded_file_contents = downloaded_file.read() + self.assertTrue(downloaded_file_contents) + self.assertEqual(downloaded_file_contents, sample_file.read()) + + def test_bad_protocol(self): + with TempDirectory() as test_dir: + + with self.assertRaisesRegex( + ValueError, + r"Unexpected error, should have been caught by step validation." + r" Source \(.+\) must start with known protocol \(/|file://\|http://\|https://\)." + ): + download_source_to_destination( + source_uri="bad://www.redhat.com/security/data/metrics/ds/v2/RHEL8/rhel-8.ds.xml.bz2", + destination_dir=test_dir.path + ) + def test_https_bad_uri(self): + with TempDirectory() as test_dir: + with self.assertRaisesRegex( + RuntimeError, + r"Error downloading file \(.+\): HTTP Error 404: Not Found" + ): + download_source_to_destination( + source_uri="https://www.redhat.com/security/data/metrics/ds/v2/RHEL8/does-not-exist.ds.xml.bz2", + destination_dir=test_dir.path + ) class TestUploadFile(BaseTestCase): def __create_http_response_side_effect(self, read_return): def http_response_side_effect(request): @@ -359,3 +401,38 @@ def test_https_prefix_http_error(self, opener_open_mock): file_path=sample_file_path, destination_uri="https://ploigos.com/test/foo42" ) +class TestFileMisc(BaseTestCase): + def test_create_parent_dir(self): + with TempDirectory() as test_dir: + file_path = os.path.join(test_dir.path, 'hello/world/does/not/exit/foo.yml') + + self.assertFalse(os.path.exists(file_path)) + self.assertFalse(os.path.exists(os.path.dirname(file_path))) + + create_parent_dir(file_path) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue(os.path.exists(os.path.dirname(file_path))) + + def test_base64_encode(self): + with TempDirectory() as test_dir: + sample_file_path = os.path.join( + os.path.dirname(__file__), + 'files', + 'sample.txt' + ) + + result = base64_encode(sample_file_path) + self.assertEqual(result, + 'c2FtcGxlIHRleHQgZmlsZQ==' + ) + + def test_get_file_hash(self): + with TempDirectory() as test_dir: + sample_file_path = os.path.join( + os.path.dirname(__file__), + 'files', + 'sample.txt' + ) + + result = get_file_hash(sample_file_path) + self.assertEqual(result, '09daa01246aa5ee9c29f64f644627a0ea83247857dfea2665689e26b166eef47') \ No newline at end of file diff --git a/tests/utils/test_pgp.py b/tests/utils/test_pgp.py index 489dd09b..1817d249 100644 --- a/tests/utils/test_pgp.py +++ b/tests/utils/test_pgp.py @@ -7,7 +7,7 @@ from tests.helpers.test_utils import Any from unittest.mock import patch -from ploigos_step_runner.utils.pgp import (detach_sign_with_pgp_key, import_pgp_key) +from ploigos_step_runner.utils.pgp import (detach_sign_with_pgp_key, import_pgp_key, export_pgp_public_key) from tests.helpers.base_test_case import BaseTestCase from ploigos_step_runner.exceptions import StepRunnerException @@ -163,3 +163,95 @@ def test_detach_sign_with_pgp_key_failure(self, gpg_mock): '--detach-sign', '/mock/file-to-detach-sign.txt' ) + +class TestExportPGPKey(BaseTestCase): + + TEST_FAKE_FINGERPRINT = 'DEB72DAF711EF3586266C98D9D65E8ECADD724B4' + + TEST_FAKE_PUBLIC_KEY = '''\ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQENBGDKRgEBCACyVB9ZZodlYBf5Fw5iWCohDyjFkypQjRFVaQz8XbvOzrU0reNy +fQpXFzEiNcGxhUHDM538Zv/adIOWMj3EvFTEuCY9I5DSfi4A8AfjSxLMUVSJ85f9 +rmHWKoCKoadF7FduKWnGEUk4woVNmhoVqpNMbV6IczNX2vmr7omx4/KgLu4k8WeW +3C0ItJES1fTz9epfynpndXZ9+Iiqiyx8d2cWcVtodv60bJkVIGdLhvyn3/vfSFvf +iSkw4FcKjpXt0k7Kd0e2mv5PHqEYWtkN41p6hDClu3i4EgnFwRjJK9H7lUPcttIJ +hS44e5HhLrihPVK91HZ5Eh0k5jpgQpkNV+ntABEBAAG0O0pvaG4gVGhvbXBraW5z +IChSZWQgSGF0KSAoTm8gY29tbWVudCkgPGp0aG9tcGtpQHJlZGhhdC5jb20+iQFO +BBMBCAA4FiEE3rctr3Ee81hiZsmNnWXo7K3XJLQFAmDKRgECGwMFCwkIBwIGFQoJ +CAsCBBYCAwECHgECF4AACgkQnWXo7K3XJLQANAgAjp5GQALhmhaxLXQJs9tmLzgB +VLOTn6r3z6emlIjjxZYlwLCV+xXZ12Igg+1L5IJo0Nrk2xmcAgp+3WScx29xtkbW +j44jt7eghH4clkSau8RTJWQlyMyzmLVGEDgqBr+YICcnV85NS31ILigvMorKgszP +RVJpG9G+GQRNm7L17sQUSYItX/4ZAbzBr/RhD4IRkgwFxlgrsVujcGN4UMQ2f+uy +b7jY7Gx4o3lKZc+2rFAp6Z9xkChdl69UzJA9Cb7MGNPDE2rFItqXQRUX+4bVOWjy +ffPOuPFqVZJ4989lAfNFh5zitVs2aYGMuWbrRp/AMqBY+2AWw+78/Gv2b6iPfbkB +DQRgykYBAQgAxYk3SS3wk3vGjGF8qAQ6AnMnU2XpqlGt9uTJnChDyzhb964Yxx6+ +vN3EqChoAROjyaUQN2GzmUezm1kNDa9sw4KE+a50d5nJqEniz4UvUUTzi9FluTb5 +EkRGr1zP0rd1PPWvMB9gil2lie4d7O9iwYT5KUDhiEB+LH8GvI0YENcYDN46CBvG +Qb89qhttXYp22DIll3mQPoabdmWFYD1xYRGoEwP/Zjkx+9MPOASStD4oaULkHoDn +qNMQNrVEY9YQ6JrG55a2XTLEcBaWmLyGnytRjVSs0xp8fzZCwTDgCkcoyAnNrQ7k +wiJJV3fvHP5i6meEdPdRSR4Bwc173ZzUdQARAQABiQE2BBgBCAAgFiEE3rctr3Ee +81hiZsmNnWXo7K3XJLQFAmDKRgECGwwACgkQnWXo7K3XJLS24AgAoGke/g08/FEz +Fml1e81zYEB6+GyggQS365JA16Ev+nvOQbjEk55PvZFfNMbsI0T7IamweX3BzlPf +Sb8HZsjJp0SNlhu7I1P4X3nMa2OofSEE8qd3ptEmAWqewhW+DqGDA23ccAO+VDSj +UvF+nhrYFM/BJY/SE5QyDcDRuJUG3hEAN0FPGSEbWZoAgPpQKdmwLwHeTtdadvAH +NNqw3gdbZjlF+RQbggHc4sXtK/H7wk3SU501BgP10GkfsFUekGiAlkAmYAO6gUnQ +1cfRMdTTzuGVc6j/8VshK1aYRR5A3g5rP4ZbNReqQddZmB20qe4ekkFKbRvHX/Em +B8pBNt1QOA== +=u6VF +-----END PGP PUBLIC KEY BLOCK----- +''' + + + @staticmethod + def gpg_side_effect(*_args, **kwargs): + """Side effect for gpg key load""" + kwargs['_out']('gpg: WARNING: nothing exported') + + @patch('sh.gpg', create=True) + def test_export_pgp_public_key_success(self, gpg_mock): + gpg_mock.side_effect = TestExportPGPKey.TEST_FAKE_PUBLIC_KEY + + pgp_private_key_fingerprint = TestExportPGPKey.TEST_FAKE_FINGERPRINT + + export_pgp_public_key( + pgp_private_key_fingerprint=pgp_private_key_fingerprint + ) + gpg_mock.assert_called_once_with( + '--armor', + '--export', + pgp_private_key_fingerprint, + _out=Any(IOBase), + _err_to_out=True, + _tee='out' + ) + + + @patch('sh.gpg', create=True) + def test_export_pgp_public_key_failure(self, gpg_mock): + gpg_mock.side_effect = sh.ErrorReturnCode('gpg', b'mock stdout', b'mock error') + with self.assertRaisesRegex( + RuntimeError, + re.compile( + r'Error exporting pgp public key:' + r".*RAN: gpg" + r".*STDOUT:" + r".*mock stdout" + r".*STDERR:" + r".*mock error", + re.DOTALL + ) + ): + pgp_private_key_fingerprint = TestExportPGPKey.TEST_FAKE_FINGERPRINT + export_pgp_public_key( + pgp_private_key_fingerprint=pgp_private_key_fingerprint + ) + + gpg_mock.assert_called_once_with( + '--armor', + '--export', + pgp_private_key_fingerprint, + _out=Any(IOBase), + _err_to_out=True, + _tee='out' + ) \ No newline at end of file