diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index c18f85e..e61f609 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -17,10 +17,14 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: 3.x + python-version: '3.12' - name: Install dependencies - run: pip install -r requirements.txt + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install pytest - name: Run pytest - run: python -m pytest + run: | + python -m pytest -v tests/ \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..ab78836 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,116 @@ +## Local setup + +Clone the reposistory to your machine and install the required dependencies. + +### Create a virtual environment + +```fish +python3 -m venv.venv +``` + +### Install dependencies + +```fish +pip install -r requirements.txt +``` + +### Demo script + +This demo python script will create, read, update and delete secrets via the SDK. Just update the host, app, env and token constants at the top. + +```python +from src.phase import Phase, CreateSecretsOptions, GetAllSecretsOptions, UpdateSecretOptions, DeleteSecretOptions + +CONSOLE_HOST = 'https://console.phase.dev' +APP_NAME = '<app-name>' +ENV_NAME = "<env-name>" +TOKEN = '<service-token>' + +# Initialize the Phase object with host and service token +phase = Phase(init=False, + pss=TOKEN, + host=CONSOLE_HOST) + +# Create secrets with references +create_options = CreateSecretsOptions( + env_name=ENV_NAME, + app_name=APP_NAME, + key_value_pairs=[ + {"BASE_URL": "https://api.example.com"}, + {"API_ENDPOINT": "${BASE_URL}/v1/data"}, + {"NESTED_REF": "Nested ${API_ENDPOINT}"} + ] +) +create_result = phase.create_secrets(create_options) +print(f"Create secrets result: {create_result}") + +# Read and resolve references +get_options = GetAllSecretsOptions( + env_name=ENV_NAME, + app_name=APP_NAME +) +secrets = phase.get_all_secrets(get_options) + +resolved_secrets = phase.resolve_references(secrets, ENV_NAME, APP_NAME) + +print("\nResolved Secrets:") +print("----------------") +for secret in resolved_secrets: + print(f"{secret.key}: {secret.value}") + +# Update secrets +update_options = UpdateSecretOptions( + env_name=ENV_NAME, + app_name=APP_NAME, + key="BASE_URL", + value="https://api.acme.com", + secret_path="/", + destination_path="/", # Optional: move secret to a new path + override=False, # Optional: create a personal override + toggle_override=False # Optional: toggle personal override +) +update_result = phase.update_secret(update_options) + +print(f"\nUpdate secrets result: {update_result}") +print("----------------") + + +## Refetch secrets +secrets = phase.get_all_secrets(get_options) + +resolved_secrets = phase.resolve_references(secrets, ENV_NAME, APP_NAME) + +print("\nResolved Secrets:") +print("----------------") +for secret in resolved_secrets: + print(f"{secret.key}: {secret.value}") + + +# Delete secrets +delete_options = DeleteSecretOptions( + env_name=ENV_NAME, + app_name=APP_NAME, + key_to_delete="BASE_URL", + secret_path="/" +) +result = phase.delete_secret(delete_options) +print(f"Delete result: {result}") + +## Refetch secrets +secrets = phase.get_all_secrets(get_options) + +resolved_secrets = phase.resolve_references(secrets, ENV_NAME, APP_NAME) + +print("\nResolved Secrets:") +print("----------------") +for secret in resolved_secrets: + print(f"{secret.key}: {secret.value}") +``` + +## Running Tests + +Run the test suite with: + +```fish +python -m pytest -v tests/ +``` diff --git a/README.md b/README.md index cb8e690..76df54e 100644 --- a/README.md +++ b/README.md @@ -1,35 +1,143 @@ # Python SDK for Phase -SDK to integrate Phase in server-side applications running Python +SDK to integrate Phase in server-side applications running Python. This SDK allows you to manage secrets securely using the Phase platform. ## Install -`pip install phase-dev` +``` +pip install phase-dev +``` ## Import ```python -from phase import Phase; +from phase import Phase, CreateSecretsOptions, GetAllSecretsOptions, GetSecretOptions, UpdateSecretOptions, DeleteSecretOptions ``` ## Initialize -Initialize the SDK with your `APP_ID` and `APP_SECRET`: +Initialize the SDK with your host and token: ```python -phase = Phase(APP_ID, APP_SECRET) +phase = Phase( + init=False, + host='https://your-phase-host.com', + pss=PHASE_SERVICE_TOKEN + +) ``` ## Usage -### Encrypt +### Create Secrets + +Create one or more secrets in a specified application and environment: + +```python +create_options = CreateSecretsOptions( + env_name="Development", + app_name="Your App Name", + key_value_pairs=[ + {"API_KEY": "your-api-key"}, + {"DB_PASSWORD": "your-db-password"} + ], + secret_path="/api" +) +result = phase.create_secrets(create_options) +print(f"Create secrets result: {result}") +``` + +### Get Secrets + +Fetch one or more secrets from a specified application and environment: + +```python +get_options = GetAllSecretsOptions( + env_name="Development", + app_name="Your App Name", + tag="api", # Optional: filter by tag + secret_path="/api" # Optional: specify path +) +secrets = phase.get_all_secrets(get_options) +for secret in secrets: + print(f"Key: {secret.key}, Value: {secret.value}") +``` + +To get a specific secret: ```python -ciphertext = phase.encrypt("hello world"); +get_options = GetSecretOptions( + env_name="Development", + app_name="Your App Name", + key_to_find="API_KEY", + secret_path="/api" +) +secret = phase.get_secret(get_options) +if secret: + print(f"Key: {secret.key}, Value: {secret.value}") ``` -### Decrypt +### Update Secrets + +Update an existing secret in a specified application and environment: ```python -plaintext = phase.decrypt(ciphertext); +update_options = UpdateSecretOptions( + env_name="Development", + app_name="Your App Name", + key="API_KEY", + value="new-api-key-value", + secret_path="/api", + destination_path="/new-api", # Optional: move secret to a new path + override=False, # Optional: create a personal override + toggle_override=False # Optional: toggle personal override +) +result = phase.update_secret(update_options) +print(f"Update result: {result}") ``` + +### Delete Secrets + +Delete a secret from a specified application and environment: + +```python +delete_options = DeleteSecretOptions( + env_name="Development", + app_name="Your App Name", + key_to_delete="API_KEY", + secret_path="/api" +) +result = phase.delete_secret(delete_options) +print(f"Delete result: {result}") +``` + +### Resolve Secret References + +Resolve references in secret values: + +```python +get_options = GetAllSecretsOptions( + env_name="Development", + app_name="Your App Name" +) +secrets = phase.get_all_secrets(get_options) +resolved_secrets = phase.resolve_references(secrets, "Development", "Your App Name") +for secret in resolved_secrets: + print(f"Key: {secret.key}, Resolved Value: {secret.value}") +``` + +## Error Handling + +The SDK methods may raise exceptions for various error conditions. It's recommended to wrap SDK calls in try-except blocks to handle potential errors: + +```python +try: + get_options = GetAllSecretsOptions(env_name="Development", app_name="Your App Name") + secrets = phase.get_all_secrets(get_options) +except ValueError as e: + print(f"An error occurred: {e}") +``` + +## Note on Security + +Never hard-code sensitive information like tokens or secrets directly in your code. Always use environment variables or secure configuration management to provide these values to your application. diff --git a/pyproject.toml b/pyproject.toml index 24f761c..c9d691e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta" [project] name = "phase_dev" -version = "1.1.0" -description = "Python SDK for Phase" +version = "2.0.0" +description = "Python SDK for Phase secrets manager" readme = "README.md" requires-python = ">=3.10" classifiers = [ diff --git a/src/phase/__init__.py b/src/phase/__init__.py index ff34f12..fd01ad2 100644 --- a/src/phase/__init__.py +++ b/src/phase/__init__.py @@ -1,3 +1,19 @@ -from .phase import Phase +from .phase import ( + Phase, + GetSecretOptions, + GetAllSecretsOptions, + CreateSecretsOptions, + UpdateSecretOptions, + DeleteSecretOptions, + PhaseSecret +) -__all__ = ['Phase'] +__all__ = [ + 'Phase', + 'GetSecretOptions', + 'GetAllSecretsOptions', + 'CreateSecretsOptions', + 'UpdateSecretOptions', + 'DeleteSecretOptions', + 'PhaseSecret' +] \ No newline at end of file diff --git a/src/phase/phase.py b/src/phase/phase.py index 0e9fa95..602fa68 100644 --- a/src/phase/phase.py +++ b/src/phase/phase.py @@ -1,101 +1,152 @@ -import re -from nacl.bindings import crypto_kx_server_session_keys, crypto_kx_client_session_keys -from dataclasses import dataclass -from .utils.crypto import decrypt_b64, encrypt_b64, fetch_app_key, random_key_pair, reconstruct_secret -from .version import __version__, __ph_version__ - -DEFAULT_KMS_HOST = "https://kms.phase.dev" - +from dataclasses import dataclass, field +from typing import List, Dict, Optional +from .utils.phase_io import Phase as PhaseIO +from .utils.secret_referencing import resolve_all_secrets @dataclass -class AppSecret: - prefix: str - pss_version: str - app_token: str - keyshare0: str - keyshare1_unwrap_key: str - - -class Phase: - _app_id = '' - _app_pub_key = '' - _app_secret = None - _kms_host = '' - - def __init__(self, app_id, app_secret, custom_kms_host=None): - app_id_pattern = re.compile(r"^phApp:v(\d+):([a-fA-F0-9]{64})$") - app_secret_pattern = re.compile( - r"^pss:v(\d+):([a-fA-F0-9]{64}):([a-fA-F0-9]{64,128}):([a-fA-F0-9]{64})$") - - if not app_id_pattern.match(app_id): - raise ValueError("Invalid Phase APP_ID") - - if not app_secret_pattern.match(app_secret): - raise ValueError("Invalid Phase APP_SECRET") - - self._app_id = app_id - self._app_pub_key = app_id.split(':')[2] - - app_secret_segments = app_secret.split(':') - self._app_secret = AppSecret(*app_secret_segments) - self._kms_host = f"{custom_kms_host}/kms" if custom_kms_host else DEFAULT_KMS_HOST - - def encrypt(self, plaintext, tag="") -> str | None: - """ - Encrypts a plaintext string. - - Args: - plaintext (str): The plaintext to encrypt. - tag (str, optional): A tag to include in the encrypted message. The tag will not be encrypted. - - Returns: - str: The encrypted message, formatted as a string that includes the public key used for the one-time keypair, - the ciphertext, and the tag. Returns `None` if an error occurs. - """ - try: - one_time_keypair = random_key_pair() - symmetric_keys = crypto_kx_client_session_keys( - one_time_keypair[0], one_time_keypair[1], bytes.fromhex(self._app_pub_key)) - ciphertext = encrypt_b64(plaintext, symmetric_keys[1]) - pub_key = one_time_keypair[0].hex() +class GetSecretOptions: + env_name: str + app_name: str + key_to_find: Optional[str] = None + tag: Optional[str] = None + secret_path: str = "/" - return f"ph:{__ph_version__}:{pub_key}:{ciphertext}:{tag}" - except ValueError as err: - raise ValueError(f"Something went wrong: {err}") - - def decrypt(self, phase_ciphertext) -> str | None: - """ - Decrypts a Phase ciphertext string. - - Args: - phase_ciphertext (str): The encrypted message to decrypt. - - Returns: - str: The decrypted plaintext as a string. Returns `None` if an error occurs. - - Raises: - ValueError: If the ciphertext is not in the expected format (e.g. wrong prefix, wrong number of fields). - """ - - try: - [prefix, version, client_pub_key_hex, ct, - tag] = phase_ciphertext.split(':') - if prefix != 'ph' or len(phase_ciphertext.split(':')) != 5: - raise ValueError('Ciphertext is invalid') - client_pub_key = bytes.fromhex(client_pub_key_hex) - - keyshare1 = fetch_app_key( - self._app_secret.app_token, self._app_secret.keyshare1_unwrap_key, self._app_id, len(ct)/2, self._kms_host) +@dataclass +class GetAllSecretsOptions: + env_name: str + app_name: str + tag: Optional[str] = None + secret_path: str = "/" - app_priv_key = reconstruct_secret( - [self._app_secret.keyshare0, keyshare1]) +@dataclass +class CreateSecretsOptions: + env_name: str + app_name: str + key_value_pairs: List[Dict[str, str]] + secret_path: str = "/" - session_keys = crypto_kx_server_session_keys(bytes.fromhex( - self._app_pub_key), bytes.fromhex(app_priv_key), client_pub_key) +@dataclass +class UpdateSecretOptions: + env_name: str + app_name: str + key: str + value: Optional[str] = None + secret_path: str = "/" + destination_path: Optional[str] = None + override: bool = False + toggle_override: bool = False - plaintext = decrypt_b64(ct, session_keys[0].hex()) +@dataclass +class DeleteSecretOptions: + env_name: str + app_name: str + key_to_delete: str + secret_path: str = "/" - return plaintext +@dataclass +class PhaseSecret: + key: str + value: str + comment: str = "" + path: str = "/" + tags: List[str] = field(default_factory=list) + overridden: bool = False - except ValueError as err: - raise ValueError(f"Something went wrong: {err}") +class Phase: + def __init__(self, init=True, pss=None, host=None): + self._phase_io = PhaseIO(init=init, pss=pss, host=host) + + def get_secret(self, options: GetSecretOptions) -> Optional[PhaseSecret]: + secrets = self._phase_io.get( + env_name=options.env_name, + keys=[options.key_to_find] if options.key_to_find else None, + app_name=options.app_name, + tag=options.tag, + path=options.secret_path + ) + if secrets: + secret = secrets[0] + return PhaseSecret( + key=secret['key'], + value=secret['value'], + comment=secret.get('comment', ''), + path=secret.get('path', '/'), + tags=secret.get('tags', []), + overridden=secret.get('overridden', False) + ) + return None + + def get_all_secrets(self, options: GetAllSecretsOptions) -> List[PhaseSecret]: + secrets = self._phase_io.get( + env_name=options.env_name, + app_name=options.app_name, + tag=options.tag, + path=options.secret_path + ) + return [ + PhaseSecret( + key=secret['key'], + value=secret['value'], + comment=secret.get('comment', ''), + path=secret.get('path', '/'), + tags=secret.get('tags', []), + overridden=secret.get('overridden', False) + ) + for secret in secrets + ] + + def create_secrets(self, options: CreateSecretsOptions) -> str: + # Convert the list of dictionaries to a list of tuples + key_value_tuples = [(list(item.keys())[0], list(item.values())[0]) for item in options.key_value_pairs] + + response = self._phase_io.create( + key_value_pairs=key_value_tuples, + env_name=options.env_name, + app_name=options.app_name, + path=options.secret_path + ) + return "Success" if response.status_code == 200 else f"Error: {response.status_code}" + + def update_secret(self, options: UpdateSecretOptions) -> str: + return self._phase_io.update( + env_name=options.env_name, + key=options.key, + value=options.value, + app_name=options.app_name, + source_path=options.secret_path, + destination_path=options.destination_path, + override=options.override, + toggle_override=options.toggle_override + ) + + def delete_secret(self, options: DeleteSecretOptions) -> List[str]: + return self._phase_io.delete( + env_name=options.env_name, + keys_to_delete=[options.key_to_delete], + app_name=options.app_name, + path=options.secret_path + ) + + def resolve_references(self, secrets: List[PhaseSecret], env_name: str, app_name: str) -> List[PhaseSecret]: + all_secrets = [ + { + 'environment': env_name, + 'application': app_name, + 'key': secret.key, + 'value': secret.value, + 'path': secret.path + } + for secret in secrets + ] + + for secret in secrets: + resolved_value = resolve_all_secrets( + secret.value, + all_secrets, + self._phase_io, + app_name, + env_name + ) + secret.value = resolved_value + + return secrets \ No newline at end of file diff --git a/src/phase/utils/const.py b/src/phase/utils/const.py new file mode 100644 index 0000000..ec3c99f --- /dev/null +++ b/src/phase/utils/const.py @@ -0,0 +1,18 @@ +import os +import re + +__version__ = "2.0.0" +__ph_version__ = "v1" + + +SECRET_REF_REGEX = re.compile(r'\$\{([^}]+)\}') + + +PHASE_CLOUD_API_HOST = "https://console.phase.dev" + +pss_user_pattern = re.compile(r"^pss_user:v(\d+):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64})$") +pss_service_pattern = re.compile(r"^pss_service:v(\d+):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64})$") + +cross_env_pattern = re.compile(r"\$\{(.+?)\.(.+?)\}") +local_ref_pattern = re.compile(r"\$\{([^.]+?)\}") + diff --git a/src/phase/utils/crypto.py b/src/phase/utils/crypto.py index a70ccd9..a008d95 100644 --- a/src/phase/utils/crypto.py +++ b/src/phase/utils/crypto.py @@ -1,161 +1,259 @@ -import requests -import functools import base64 from typing import Tuple -from nacl.bindings import crypto_kx_keypair, crypto_aead_xchacha20poly1305_ietf_encrypt, crypto_aead_xchacha20poly1305_ietf_decrypt, randombytes, crypto_secretbox_NONCEBYTES -from ..version import __version__ - - -def xor_bytes(a, b) -> bytes: - """ - Computes the XOR of two byte arrays byte by byte. - - Args: - a (bytes): The first byte array - b (bytes): The second byte array. - - Returns: - bytes: A byte array representing the XOR of the two input byte arrays. - """ - return bytes(x ^ y for x, y in zip(a, b)) - - -def reconstruct_secret(shares) -> str: - """ - Reconstructs a secret given an array of shares. - - Args: - shares (list): A list of hex-encoded secret shares. - - Returns: - str: The reconstructed secret as a hex-encoded string. - """ - return functools.reduce(xor_bytes, [bytes.fromhex(share) for share in shares]).hex() - - -def random_key_pair() -> Tuple[bytes, bytes]: - """ - Generates a random key exchange keypair. - - Returns: - Tuple[bytes, bytes]: A tuple of two bytes objects representing the public and - private keys of the keypair. - """ - keypair = crypto_kx_keypair() - return keypair - - -def encrypt_raw(plaintext, key) -> bytes: - """ - Encrypts plaintext with the given key and returns the ciphertext with appended nonce - - Args: - plaintext (bytes): Plaintext to be encrypted - key (bytes): The encryption key to be used - - Returns: - bytes: ciphertext + nonce - """ - try: - nonce = randombytes(crypto_secretbox_NONCEBYTES) +import string +from nacl.secret import SecretBox +from typing import List +from nacl.encoding import RawEncoder +import functools +import nacl.bindings +from nacl.encoding import HexEncoder +from nacl.public import PrivateKey +from nacl.bindings import ( + crypto_kx_keypair, + crypto_aead_xchacha20poly1305_ietf_encrypt, + crypto_aead_xchacha20poly1305_ietf_decrypt, + randombytes, + crypto_secretbox_NONCEBYTES, + crypto_kx_server_session_keys, + crypto_kx_client_session_keys, + crypto_kx_seed_keypair, +) +from nacl.hash import blake2b +from nacl.utils import random +from base64 import b64encode, b64decode +from .const import __ph_version__ + + +class CryptoUtils: + VERSION = 1 + + @staticmethod + def random_key_pair() -> Tuple[bytes, bytes]: + """ + Generates a random key exchange keypair. + + Returns: + Tuple[bytes, bytes]: A tuple of two bytes objects representing the public and + private keys of the keypair. + """ + keypair = crypto_kx_keypair() + return keypair + + @staticmethod + def client_session_keys(ephemeral_key_pair, recipient_pub_key): + client_public_key, client_private_key = ephemeral_key_pair + return nacl.bindings.crypto_kx_client_session_keys( + client_public_key, client_private_key, recipient_pub_key + ) + + @staticmethod + def server_session_keys(app_key_pair, data_pub_key): + server_public_key, server_private_key = app_key_pair + return nacl.bindings.crypto_kx_server_session_keys( + server_public_key, server_private_key, data_pub_key + ) + + @staticmethod + def encrypt_asymmetric(plaintext, public_key_hex): + public_key, private_key = CryptoUtils.random_key_pair() + + symmetric_keys = CryptoUtils.client_session_keys( + (public_key, private_key), bytes.fromhex(public_key_hex) + ) + + ciphertext = CryptoUtils.encrypt_string(plaintext, symmetric_keys[1]) + + return f"ph:v{CryptoUtils.VERSION}:{public_key.hex()}:{ciphertext}" + + @staticmethod + def decrypt_asymmetric(ciphertext_string, private_key_hex, public_key_hex): + ciphertext_segments = ciphertext_string.split(":") + + if len(ciphertext_segments) != 4: + raise ValueError("Invalid ciphertext") + + public_key = bytes.fromhex(public_key_hex) + private_key = bytes.fromhex(private_key_hex) + + session_keys = CryptoUtils.server_session_keys( + (public_key, private_key), bytes.fromhex(ciphertext_segments[2]) + ) + + plaintext = CryptoUtils.decrypt_string(ciphertext_segments[3], session_keys[0]) + + return plaintext + + @staticmethod + def digest(input): + hash = blake2b(input.encode(), encoder=nacl.encoding.RawEncoder) + return base64.b64encode(hash).decode() + + @staticmethod + def encrypt_raw(plaintext, key): + nonce = random(nacl.bindings.crypto_secretbox_NONCEBYTES) ciphertext = crypto_aead_xchacha20poly1305_ietf_encrypt( - plaintext, None, nonce, key) - return ciphertext + nonce - except Exception: - raise ValueError('Encryption error') - - -def encrypt_b64(plaintext, key_bytes) -> str: + plaintext.encode(), None, nonce, key + ) + return bytearray(ciphertext + nonce) + + @staticmethod + def decrypt_raw(ct, key) -> bytes: + try: + nonce = ct[-24:] + ciphertext = ct[:-24] + plaintext_bytes = crypto_aead_xchacha20poly1305_ietf_decrypt( + ciphertext, None, nonce, key + ) + return plaintext_bytes + except Exception as e: + print(f"Exception during decryption: {e}") + raise ValueError("Decryption error") from e + + @staticmethod + def encrypt_b64(plaintext, key_bytes) -> str: + """ + Encrypts a string using a key. Returns ciphertext as a base64 string + + Args: + plaintext (str): The plaintext to encrypt. + key (bytes): The key to use for encryption. + + Returns: + str: The ciphertext obtained by encrypting the string with the key, encoded with base64. + """ + + plaintext_bytes = bytes(plaintext, "utf-8") + ciphertext = CryptoUtils.encrypt_raw(plaintext_bytes, key_bytes) + return base64.b64encode(ciphertext).decode("utf-8") + + @staticmethod + def decrypt_b64(ct, key) -> bytes: + """ + Decrypts a base64 ciphertext using a key. + + Args: + ct (str): The ciphertext to decrypt, as a base64 string. + key (str): The key to use for decryption, as a hexadecimal string. + + Returns: + str: The plaintext obtained by decrypting the ciphertext with the key. + """ + + ct_bytes = base64.b64decode(ct) + key_bytes = bytes.fromhex(key) + + plaintext_bytes = CryptoUtils.decrypt_raw(ct_bytes, key_bytes) + + return plaintext_bytes.decode("utf-8") + + @staticmethod + def encrypt_string(plaintext, key): + return base64.b64encode(CryptoUtils.encrypt_raw(plaintext, key)).decode() + + @staticmethod + def decrypt_string(cipherText, key): + return CryptoUtils.decrypt_raw(base64.b64decode(cipherText), key).decode() + + @staticmethod + def env_keypair(env_seed: str): + """ + Derives an env keyring from the given seed + + :param env_seed: Env seed as a hex string + :return: A dictionary containing the public and private keys in hex format + """ + + # Convert the hex seed to bytes + seed_bytes = bytes.fromhex(env_seed) + + # Generate the key pair + public_key, private_key = nacl.bindings.crypto_kx_seed_keypair(seed_bytes) + + # Convert the keys to hex format + public_key_hex = public_key.hex() + private_key_hex = private_key.hex() + + # Return the keys in a dictionary + return {"publicKey": public_key_hex, "privateKey": private_key_hex} + + @staticmethod + def blake2b_digest(input_str: str, salt: str) -> str: + """ + Generate a BLAKE2b hash of the input string with a salt. + + Args: + input_str (str): The input string to be hashed. + salt (str): The salt (key) used for hashing. + + Returns: + str: The hexadecimal representation of the hash. + """ + hash_size = 32 # 32 bytes (256 bits) + hashed = blake2b( + input_str.encode("utf-8"), + key=salt.encode("utf-8"), + encoder=nacl.encoding.RawEncoder, + digest_size=hash_size, + ) + hex_encoded = hashed.hex() + return hex_encoded + + @staticmethod + def xor_bytes(a, b) -> bytes: + """ + Computes the XOR of two byte arrays byte by byte. + + Args: + a (bytes): The first byte array + b (bytes): The second byte array. + + Returns: + bytes: A byte array representing the XOR of the two input byte arrays. + """ + return bytes(x ^ y for x, y in zip(a, b)) + + @staticmethod + def reconstruct_secret(shares) -> str: + """ + Reconstructs a secret given an array of shares. + + Args: + shares (list): A list of hex-encoded secret shares. + + Returns: + str: The reconstructed secret as a hex-encoded string. + """ + return functools.reduce( + CryptoUtils.xor_bytes, [bytes.fromhex(share) for share in shares] + ).hex() + + +def generate_random_secret(type='hex', length=32): """ - Encrypts a string using a key. Returns ciphertext as a base64 string + Generates a random secret based on the specified type and length. Args: - plaintext (str): The plaintext to encrypt. - key (bytes): The key to use for encryption. + type (str): Type of secret to generate ('alphanumeric', 'hex', 'base64', 'base64url', 'key128', 'key256'). + length (int): Length of the secret (1 to 256). For 'key128' and 'key256', length is ignored. Returns: - str: The ciphertext obtained by encrypting the string with the key, encoded with base64. + str: The generated secret. """ - - plaintext_bytes = bytes(plaintext, 'utf-8') - ciphertext = encrypt_raw(plaintext_bytes, key_bytes) - return base64.b64encode(ciphertext).decode('utf-8') - - -def decrypt_raw(ct, key) -> bytes: - """ - Decrypts a ciphertext using a key. - - Args: - ct (bytes): The ciphertext to decrypt. - key (bytes): The key to use for decryption, as a hexadecimal string. - - Returns: - bytes: The plaintext obtained by decrypting the ciphertext with the key. - """ - - try: - nonce = ct[-24:] - ciphertext = ct[:-24] - - plaintext_bytes = crypto_aead_xchacha20poly1305_ietf_decrypt( - ciphertext, None, nonce, key) - - return plaintext_bytes - except Exception: - raise ValueError('Decryption error') - - -def decrypt_b64(ct, key) -> bytes: - """ - Decrypts a base64 ciphertext using a key. - - Args: - ct (str): The ciphertext to decrypt, as a base64 string. - key (str): The key to use for decryption, as a hexadecimal string. - - Returns: - str: The plaintext obtained by decrypting the ciphertext with the key. - """ - - ct_bytes = base64.b64decode(ct) - key_bytes = bytes.fromhex(key) - - plaintext_bytes = decrypt_raw(ct_bytes, key_bytes) - - return plaintext_bytes.decode('utf-8') - - -def fetch_app_key(appToken, wrapKey, appId, dataSize, host) -> str: - """ - Fetches the application key share from Phase KMS. - - Args: - appToken (str): The token for the application to retrieve the key for. - wrapKey (str): The key used to encrypt the wrapped key share. - appId (str): The identifier for the application to retrieve the key for. - dataSize (int): The size of the data to be decrypted. - - Returns: - str: The unwrapped share obtained by decrypting the wrapped key share. - Raises: - Exception: If the app token is invalid (HTTP status code 404). - """ - - headers = { - "Authorization": f"Bearer {appToken}", - "EventType": "decrypt", - "PhaseNode": f"python:{__version__}", - "PhSize": f"{dataSize}" - } - - response = requests.get(f"{host}/{appId}", headers=headers) - - if response.status_code == 404: - raise Exception("Invalid app token") + if not 1 <= length <= 256: + raise ValueError("Length must be between 1 and 256.") + + if type == 'alphanumeric': + chars = string.ascii_letters + string.digits + return ''.join(chars[byte % len(chars)] for byte in nacl.utils.random(length)) + elif type == 'hex': + return nacl.utils.random(length).hex() + elif type == 'base64': + return base64.b64encode(nacl.utils.random(length)).decode() + elif type == 'base64url': + return base64.urlsafe_b64encode(nacl.utils.random(length)).decode() + elif type == 'key128': + return base64.b64encode(nacl.utils.random(SecretBox.KEY_SIZE // 2)).decode() + elif type == 'key256': + return base64.b64encode(nacl.utils.random(SecretBox.KEY_SIZE)).decode() else: - json_data = response.json() - wrapped_key_share = json_data["wrappedKeyShare"] - unwrapped_key = decrypt_raw(bytes.fromhex( - wrapped_key_share), bytes.fromhex(wrapKey)) - return unwrapped_key.decode("utf-8") + raise ValueError("Invalid secret type specified.") diff --git a/src/phase/utils/exceptions.py b/src/phase/utils/exceptions.py new file mode 100644 index 0000000..128d292 --- /dev/null +++ b/src/phase/utils/exceptions.py @@ -0,0 +1,7 @@ +class EnvironmentNotFoundException(Exception): + def __init__(self, env_name): + super().__init__(f"⚠️\u200A Warning: The environment '{env_name}' either does not exist or you do not have access to it.") + +class OverrideNotFoundException(Exception): + def __init__(self, key): + super().__init__(f"No override exists for this secret. To set one, run: phase secrets update {key} --override") diff --git a/src/phase/utils/misc.py b/src/phase/utils/misc.py new file mode 100644 index 0000000..cd5c4c6 --- /dev/null +++ b/src/phase/utils/misc.py @@ -0,0 +1,169 @@ +import os +import platform +import subprocess +import webbrowser +import getpass +import json +from .exceptions import EnvironmentNotFoundException +from urllib.parse import urlparse +from typing import Union, List +from .const import __version__, PHASE_CLOUD_API_HOST, cross_env_pattern, local_ref_pattern + + +def get_default_user_token() -> str: + """ + Fetch the default user's personal access token from the config file in PHASE_SECRETS_DIR. + + Returns: + - str: The default user's personal access token. + + Raises: + - ValueError: If the config file is not found, the default user's ID is missing, or the token is not set. + """ + config_file_path = os.path.join(PHASE_SECRETS_DIR, 'config.json') + + if not os.path.exists(config_file_path): + raise ValueError("Config file not found. Please login with phase auth or supply a PHASE_SERVICE_TOKEN as an environment variable.") + + with open(config_file_path, 'r') as f: + config_data = json.load(f) + + default_user_id = config_data.get("default-user") + if not default_user_id: + raise ValueError("Default user ID is missing in the config file.") + + for user in config_data.get("phase-users", []): + if user['id'] == default_user_id: + token = user.get("token") + if not token: + raise ValueError(f"Token for the default user (ID: {default_user_id}) is not found in the config file.") + return token + + raise ValueError("Default user not found in the config file.") + + +def phase_get_context(user_data, app_name=None, env_name=None): + """ + Get the context (ID, name, and publicKey) for a specified application and environment or the default application and environment. + + Parameters: + - user_data (dict): The user data from the API response. + - app_name (str, optional): The name (or partial name) of the desired application. + - env_name (str, optional): The name (or partial name) of the desired environment. + + Returns: + - tuple: A tuple containing the application's name, application's ID, environment's name, environment's ID, and publicKey. + + Raises: + - ValueError: If no matching application or environment is found. + """ + + # 2. If env_name isn't explicitly provided, use the default + default_env_name = "Development" + app_id = None + env_name = env_name or default_env_name + + # 3. Match the application using app_id or find the best match for partial app_name + try: + if app_name: + matching_apps = [app for app in user_data["apps"] if app_name.lower() in app["name"].lower()] + if not matching_apps: + raise ValueError(f"🔍 No application found with the name '{app_name}'.") + # Sort matching applications by the length of their names, shorter names are likely to be more specific matches + matching_apps.sort(key=lambda app: len(app["name"])) + application = matching_apps[0] + elif app_id: + application = next((app for app in user_data["apps"] if app["id"] == app_id), None) + if not application: + raise ValueError(f"🔍 No application found with the name '{app_name_from_config}' and ID: '{app_id}'.") + else: + raise ValueError("🤔 No application context provided. Please run 'phase init' or pass the '--app' flag followed by your application name.") + + # 4. Attempt to match environment with the exact name or a name that contains the env_name string + environment = next((env for env in application["environment_keys"] if env_name.lower() in env["environment"]["name"].lower()), None) + + if not environment: + raise EnvironmentNotFoundException(env_name) + + # Return application name, application ID, environment name, environment ID, and public key + return (application["name"], application["id"], environment["environment"]["name"], environment["environment"]["id"], environment["identity_key"]) + except StopIteration: + raise ValueError("🔍 Application or environment not found.") + + +def normalize_tag(tag): + """ + Normalize a tag by replacing underscores with spaces. + + Args: + tag (str): The tag to normalize. + + Returns: + str: The normalized tag. + """ + return tag.replace('_', ' ').lower() + + +def tag_matches(secret_tags, user_tag): + """ + Check if the user-provided tag partially matches any of the secret tags. + + Args: + secret_tags (list): The list of tags associated with a secret. + user_tag (str): The user-provided tag to match. + + Returns: + bool: True if there's a partial match, False otherwise. + """ + normalized_user_tag = normalize_tag(user_tag) + for tag in secret_tags: + normalized_secret_tag = normalize_tag(tag) + if normalized_user_tag in normalized_secret_tag: + return True + return False + + +def get_user_agent(): + """ + Constructs a user agent string containing information about the CLI's version, + the operating system, its version, its architecture, and the local username with machine name. + + Returns: + str: The constructed user agent string. + """ + + details = [] + + # Get CLI version + try: + cli_version = f"phase-python-sdk/{__version__}" + details.append(cli_version) + except: + pass + + # Get OS and version + try: + os_type = platform.system() # e.g., Windows, Linux, Darwin (for macOS) + os_version = platform.release() + details.append(f"{os_type} {os_version}") + except: + pass + + # Get architecture + try: + architecture = platform.machine() + details.append(architecture) + except: + pass + + # Get username and hostname + try: + username = getpass.getuser() + hostname = platform.node() + user_host_string = f"{username}@{hostname}" + details.append(user_host_string) + except: + pass + + user_agent_str = ' '.join(details) + return user_agent_str \ No newline at end of file diff --git a/src/phase/utils/network.py b/src/phase/utils/network.py new file mode 100644 index 0000000..0e4872b --- /dev/null +++ b/src/phase/utils/network.py @@ -0,0 +1,309 @@ +import os +import requests +from .misc import get_user_agent +from typing import List +from typing import Dict +import json + +# Check if SSL verification should be skipped +VERIFY_SSL = os.environ.get('PHASE_VERIFY_SSL', 'True').lower() != 'false' + +# Check if debug mode is enabled +PHASE_DEBUG = os.environ.get('PHASE_DEBUG', 'False').lower() == 'true' + +# Suppress InsecureRequestWarning if SSL verification is skipped +if not VERIFY_SSL: + requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) + + +def handle_request_errors(response: requests.Response) -> None: + """ + Check the HTTP status code of a response and print the error if the status code is not 200. + + Args: + response (requests.Response): The HTTP response to check. + """ + if response.status_code == 403: + print("🚫 Not authorized. Token expired or revoked.") + return + + if response.status_code != 200: + try: + error_details = json.loads(response.text).get('error', 'Unknown error') + except json.JSONDecodeError: + error_details = 'Unknown error' + if PHASE_DEBUG: + error_details += f" (Raw response: {response.text})" + + error_message = f"🗿 Request failed with status code {response.status_code}: {error_details}" + raise Exception(error_message) + + +def handle_connection_error(e: Exception) -> None: + """ + Handle ConnectionError exceptions. + + Args: + e (Exception): The exception to handle. + """ + error_message = "🗿 Network error: Please check your internet connection." + if PHASE_DEBUG: + error_message += f" Detail: {str(e)}" + raise Exception(error_message) + + +def handle_ssl_error(e: Exception) -> None: + """ + Handle SSLError exceptions. + + Args: + e (Exception): The exception to handle. + """ + error_message = "🗿 SSL error: The Phase Console is using an invalid/expired or a self-signed certificate." + if PHASE_DEBUG: + error_message += f" Detail: {str(e)}" + raise Exception(error_message) + + +def construct_http_headers(token_type: str, app_token: str) -> Dict[str, str]: + """ + Construct common headers used for HTTP requests. + + Args: + token_type (str): The type of token being used. + app_token (str): The token for the application. + + Returns: + Dict[str, str]: The common headers including User-Agent. + """ + return { + "Authorization": f"Bearer {token_type.capitalize()} {app_token}", + "User-Agent": get_user_agent() + } + + +def fetch_phase_user(token_type: str, app_token: str, host: str) -> requests.Response: + """ + Fetch users from the Phase API. + + Args: + app_token (str): The token for the application. + + Returns: + requests.Response: The HTTP response from the Phase API. + """ + + headers = construct_http_headers(token_type, app_token) + + URL = f"{host}/service/secrets/tokens/" + + try: + response = requests.get(URL, headers=headers, verify=VERIFY_SSL) + handle_request_errors(response) + return response + except requests.exceptions.ConnectionError as e: + handle_connection_error(e) + except requests.exceptions.SSLError as e: + handle_ssl_error(e) + +def fetch_app_key(token_type: str, app_token, host) -> str: + """ + Fetches the application key share from Phase API. + + Args: + app_token (str): The token for the application to retrieve the key for. + token_type (str): The type of token being used, either "user" or "service". Defaults to "user". + + Returns: + str: The wrapped key share. + Raises: + Exception: If the app token is invalid (HTTP status code 404). + """ + + headers = construct_http_headers(token_type, app_token) + + URL = f"{host}/service/secrets/tokens/" + + response = requests.get(URL, headers=headers) + + if response.status_code != 200: + raise ValueError(f"Request failed with status code {response.status_code}: {response.text}") + + if not response.text: + raise ValueError("The response body is empty!") + + try: + json_data = response.json() + except requests.exceptions.JSONDecodeError: + raise ValueError(f"Failed to decode JSON from response: {response.text}") + + wrapped_key_share = json_data.get("wrapped_key_share") + if not wrapped_key_share: + raise ValueError("Wrapped key share not found in the response!") + + return wrapped_key_share + + +def fetch_wrapped_key_share(token_type: str, app_token: str, host: str) -> str: + """ + Fetches the wrapped application key share from Phase API. + + Args: + token_type (str): The type of token being used, either "user" or "service". + app_token (str): The token for the application to retrieve the key for. + host (str): The host for the API call. + + Returns: + str: The wrapped key share. + + Raises: + ValueError: If any errors occur during the fetch operation. + """ + + headers = construct_http_headers(token_type, app_token) + + URL = f"{host}/service/secrets/tokens/" + + response = requests.get(URL, headers=headers) + + if response.status_code != 200: + raise ValueError(f"Request failed with status code {response.status_code}: {response.text}") + + if not response.text: + raise ValueError("The response body is empty!") + + try: + json_data = response.json() + except requests.exceptions.JSONDecodeError: + raise ValueError(f"Failed to decode JSON from response: {response.text}") + + wrapped_key_share = json_data.get("wrapped_key_share") + if not wrapped_key_share: + raise ValueError("Wrapped key share not found in the response!") + + return wrapped_key_share + + +def fetch_phase_secrets(token_type: str, app_token: str, id: str, host: str, key_digest: str = '', path: str = '') -> requests.Response: + """ + Fetch a single secret from Phase API based on key digest, with an optional path parameter. + + Args: + token_type (str): The type of the token. + app_token (str): The token for the application. + id (str): The environment ID. + host (str): The host URL. + key_digest (str): The digest of the key to fetch. + path (str, optional): A specific path to fetch secrets from. + + Returns: + dict: The single secret fetched from the Phase API, or an error message. + """ + + headers = {**construct_http_headers(token_type, app_token), "Environment": id, "KeyDigest": key_digest} + if path: + headers["Path"] = path + + URL = f"{host}/service/secrets/" + + try: + response = requests.get(URL, headers=headers, verify=VERIFY_SSL) + handle_request_errors(response) + return response + except requests.exceptions.ConnectionError as e: + handle_connection_error(e) + except requests.exceptions.SSLError as e: + handle_ssl_error(e) + + +def create_phase_secrets(token_type: str, app_token: str, environment_id: str, secrets: List[dict], host: str) -> requests.Response: + """ + Create secrets in Phase API through HTTP POST request. + + Args: + app_token (str): The token for the application. + environment_id (str): The environment ID. + secrets (List[dict]): The list of secrets to be created. + + Returns: + requests.Response: The HTTP response from the Phase API. + """ + + headers = {**construct_http_headers(token_type, app_token), "Environment": environment_id} + + data = { + "secrets": secrets + } + + URL = f"{host}/service/secrets/" + + try: + response = requests.post(URL, headers=headers, json=data, verify=VERIFY_SSL) + handle_request_errors(response) + return response + except requests.exceptions.ConnectionError as e: + handle_connection_error(e) + except requests.exceptions.SSLError as e: + handle_ssl_error(e) + + +def update_phase_secrets(token_type: str, app_token: str, environment_id: str, secrets: List[dict], host: str) -> requests.Response: + """ + Update secrets in Phase API through HTTP PUT request. + + Args: + app_token (str): The token for the application. + environment_id (str): The environment ID. + secrets (List[dict]): The list of secrets to be updated. + + Returns: + requests.Response: The HTTP response from the Phase API. + """ + + headers = {**construct_http_headers(token_type, app_token), "Environment": environment_id} + + data = { + "secrets": secrets + } + + URL = f"{host}/service/secrets/" + + try: + response = requests.put(URL, headers=headers, json=data, verify=VERIFY_SSL) + handle_request_errors(response) + return response + except requests.exceptions.ConnectionError as e: + handle_connection_error(e) + except requests.exceptions.SSLError as e: + handle_ssl_error(e) + + +def delete_phase_secrets(token_type: str, app_token: str, environment_id: str, secret_ids: List[str], host: str) -> requests.Response: + """ + Delete secrets from Phase API. + + Args: + app_token (str): The token for the application. + environment_id (str): The environment ID. + secret_ids (List[str]): The list of secret IDs to be deleted. + + Returns: + requests.Response: The HTTP response from the Phase API. + """ + + headers = {**construct_http_headers(token_type, app_token), "Environment": environment_id} + + data = { + "secrets": secret_ids + } + + URL = f"{host}/service/secrets/" + + try: + response = requests.delete(URL, headers=headers, json=data, verify=VERIFY_SSL) + handle_request_errors(response) + return response + except requests.exceptions.ConnectionError as e: + handle_connection_error(e) + except requests.exceptions.SSLError as e: + handle_ssl_error(e) diff --git a/src/phase/utils/phase_io.py b/src/phase/utils/phase_io.py new file mode 100644 index 0000000..677b3ef --- /dev/null +++ b/src/phase/utils/phase_io.py @@ -0,0 +1,429 @@ +import requests +from typing import Tuple +from typing import List, Dict +from dataclasses import dataclass +from .network import ( + fetch_phase_user, + fetch_app_key, + fetch_wrapped_key_share, + fetch_phase_secrets, + create_phase_secrets, + update_phase_secrets, + delete_phase_secrets +) +from nacl.bindings import ( + crypto_kx_server_session_keys, +) +from .crypto import CryptoUtils +from .const import __ph_version__, pss_user_pattern, pss_service_pattern +from .misc import phase_get_context, normalize_tag, tag_matches +from .secret_referencing import resolve_all_secrets + + +@dataclass +class AppSecret: + prefix: str + pes_version: str + app_token: str + pss_user_public_key: str + keyshare0: str + keyshare1_unwrap_key: str + + +class Phase: + _app_pub_key = '' + _api_host = '' + _app_secret = None + + + def __init__(self, init=True, pss=None, host=None): + """ + Initializes the Phase class with optional parameters. + + Parameters: + - init (bool): Whether to initialize using default methods or use provided parameters. + - pss (str): The Phase user token. Used if init is False. + - host (str): The host URL. Used if init is False. + """ + + app_secret = pss + self._api_host = host + + # Determine the type of the token (service token or user token) + self.is_service_token = pss_service_pattern.match(app_secret) is not None + self.is_user_token = pss_user_pattern.match(app_secret) is not None + + # If it's neither a service token nor a user token, raise an error + if not self.is_service_token and not self.is_user_token: + token_type = "service token" if "pss_service" in app_secret else "user token" + raise ValueError(f"Invalid Phase {token_type}") + + # Storing the token type as a string for easier access + self._token_type = "service" if self.is_service_token else "user" + + pss_segments = app_secret.split(':') + self._app_secret = AppSecret(*pss_segments) + + + def _find_matching_environment_key(self, user_data, env_id): + for app in user_data.get("apps", []): + for environment_key in app.get("environment_keys", []): + if environment_key["environment"]["id"] == env_id: + return environment_key + return None + + + def auth(self): + try: + key = fetch_app_key( + self._token_type, self._app_secret.app_token, self._api_host) + + return "Success" + + except ValueError as err: + raise ValueError(f"Invalid Phase credentials") + + + def init(self): + response = fetch_phase_user(self._token_type, self._app_secret.app_token, self._api_host) + + # Ensure the response is OK + if response.status_code != 200: + raise ValueError(f"Request failed with status code {response.status_code}: {response.text}") + + # Parse and return the JSON content + return response.json() + + + def create(self, key_value_pairs: List[Tuple[str, str]], env_name: str, app_name: str, path: str = '/', override_value: str = None) -> requests.Response: + """ + Create secrets in Phase KMS with support for specifying a path and overrides. + + Args: + key_value_pairs (List[Tuple[str, str]]): List of tuples where each tuple contains a key and a value. + env_name (str): The name (or partial name) of the desired environment. + app_name (str): The name of the application context. + path (str, optional): The path under which to store the secrets. Defaults to the root path '/'. + override_value (str, optional): The overridden value for the secret. Defaults to None. + + Returns: + requests.Response: The HTTP response from the Phase KMS. + """ + user_response = fetch_phase_user(self._token_type, self._app_secret.app_token, self._api_host) + if user_response.status_code != 200: + raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") + + user_data = user_response.json() + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + + environment_key = self._find_matching_environment_key(user_data, env_id) + if environment_key is None: + raise ValueError(f"No environment found with id: {env_id}") + + wrapped_salt = environment_key.get("wrapped_salt") + decrypted_salt = self.decrypt(wrapped_salt) + + secrets = [] + for key, value in key_value_pairs: + encrypted_key = CryptoUtils.encrypt_asymmetric(key, public_key) + encrypted_value = CryptoUtils.encrypt_asymmetric(value, public_key) + key_digest = CryptoUtils.blake2b_digest(key, decrypted_salt) + + secret = { + "key": encrypted_key, + "keyDigest": key_digest, + "value": encrypted_value, + "path": path, + "tags": [], # TODO: Implement tags and comments creation + "comment": "" + } + + if override_value: + encrypted_override_value = CryptoUtils.encrypt_asymmetric(override_value, public_key) + secret["override"] = { + "value": encrypted_override_value, + "isActive": True + } + + secrets.append(secret) + + return create_phase_secrets(self._token_type, self._app_secret.app_token, env_id, secrets, self._api_host) + + + def get(self, env_name: str, keys: List[str] = None, app_name: str = None, tag: str = None, path: str = '') -> List[Dict]: + """ + Get secrets from Phase KMS based on key and environment, with support for personal overrides, + optional tag matching, decrypting comments, and now including path support and key digest optimization. + + Args: + env_name (str): The name (or partial name) of the desired environment. + keys (List[str], optional): The keys for which to retrieve the secret values. + app_name (str, optional): The name of the desired application. + tag (str, optional): The tag to match against the secrets. + path (str, optional): The path under which to fetch secrets, default is root. + + Returns: + List[Dict]: A list of dictionaries for all secrets in the environment that match the criteria, including their paths. + """ + + user_response = fetch_phase_user(self._token_type, self._app_secret.app_token, self._api_host) + if user_response.status_code != 200: + raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") + + user_data = user_response.json() + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + + environment_key = self._find_matching_environment_key(user_data, env_id) + if environment_key is None: + raise ValueError("No environment found with id: {}".format(env_id)) + + wrapped_seed = environment_key.get("wrapped_seed") + decrypted_seed = self.decrypt(wrapped_seed) + key_pair = CryptoUtils.env_keypair(decrypted_seed) + env_private_key = key_pair['privateKey'] + + params = {"path": path} + if keys and len(keys) == 1: + wrapped_salt = environment_key.get("wrapped_salt") + decrypted_salt = self.decrypt(wrapped_salt) + key_digest = CryptoUtils.blake2b_digest(keys[0], decrypted_salt) + params["key_digest"] = key_digest + + secrets_response = fetch_phase_secrets(self._token_type, self._app_secret.app_token, env_id, self._api_host, **params) + + secrets_data = secrets_response.json() + + results = [] + for secret in secrets_data: + # Check if a tag filter is applied and if the secret has the correct tags. + if tag and not tag_matches(secret.get("tags", []), tag): + continue + + secret_id = secret["id"] + override = secret.get("override") + # Check if the override exists and is active. + use_override = override and override.get("is_active") + + key_to_decrypt = secret["key"] + # Select the correct value based on override status. + value_to_decrypt = override["value"] if use_override else secret["value"] + comment_to_decrypt = secret["comment"] + + decrypted_key = CryptoUtils.decrypt_asymmetric(key_to_decrypt, env_private_key, public_key) + decrypted_value = CryptoUtils.decrypt_asymmetric(value_to_decrypt, env_private_key, public_key) + decrypted_comment = CryptoUtils.decrypt_asymmetric(comment_to_decrypt, env_private_key, public_key) if comment_to_decrypt else None + + result = { + "key": decrypted_key, + "value": decrypted_value, + "overridden": use_override, + "tags": secret.get("tags", []), + "comment": decrypted_comment, + "path": secret.get("path", "/"), + "application": app_name, + "environment": env_name + } + + # Only add the secret to results if the requested keys are not specified or the decrypted key is one of the requested keys. + if not keys or decrypted_key in keys: + results.append(result) + + return results + + + def update(self, env_name: str, key: str, value: str = None, app_name: str = None, source_path: str = '', destination_path: str = None, override: bool = False, toggle_override: bool = False) -> str: + """ + Update a secret in Phase KMS based on key and environment, with support for source and destination paths. + + Args: + env_name (str): The name (or partial name) of the desired environment. + key (str): The key for which to update the secret value. + value (str, optional): The new value for the secret. Defaults to None. + app_name (str, optional): The name of the desired application. + source_path (str, optional): The current path of the secret. Defaults to root path '/'. + destination_path (str, optional): The new path for the secret, if changing its location. If not provided, the path is not updated. + override (bool, optional): Whether to update an overridden secret value. Defaults to False. + toggle_override (bool, optional): Whether to toggle the override state between active and inactive. Defaults to False. + + Returns: + str: A message indicating the outcome of the update operation. + """ + + user_response = fetch_phase_user(self._token_type, self._app_secret.app_token, self._api_host) + if user_response.status_code != 200: + raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") + + user_data = user_response.json() + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + + environment_key = self._find_matching_environment_key(user_data, env_id) + if environment_key is None: + raise ValueError(f"No environment found with id: {env_id}") + + # Fetch secrets from the specified source path + secrets_response = fetch_phase_secrets(self._token_type, self._app_secret.app_token, env_id, self._api_host, path=source_path) + secrets_data = secrets_response.json() + + wrapped_seed = environment_key.get("wrapped_seed") + decrypted_seed = self.decrypt(wrapped_seed) + key_pair = CryptoUtils.env_keypair(decrypted_seed) + env_private_key = key_pair['privateKey'] + + matching_secret = next((secret for secret in secrets_data if CryptoUtils.decrypt_asymmetric(secret["key"], env_private_key, public_key) == key), None) + if not matching_secret: + return f"Key '{key}' doesn't exist in path '{source_path}'." + + encrypted_key = CryptoUtils.encrypt_asymmetric(key, public_key) + encrypted_value = CryptoUtils.encrypt_asymmetric(value or "", public_key) + + wrapped_salt = environment_key.get("wrapped_salt") + decrypted_salt = self.decrypt(wrapped_salt) + key_digest = CryptoUtils.blake2b_digest(key, decrypted_salt) + + # Determine secret value to be updated - only update shared value if not overriding or toggling + if not override and not toggle_override: + payload_value = encrypted_value + else: + payload_value = matching_secret["value"] + + secret_update_payload = { + "id": matching_secret["id"], + "key": encrypted_key, + "keyDigest": key_digest, + "value": payload_value, + "tags": matching_secret.get("tags", []), # TODO: Implement tags and comments updates + "comment": matching_secret.get("comment", ""), + "path": destination_path if destination_path is not None else matching_secret["path"] + } + + if toggle_override: + # Check if the secret has an existing override. If not, raise a custom exception. + # This prevents toggling an override on a secret that doesn't have one. + if "override" not in matching_secret or matching_secret["override"] is None: + raise OverrideNotFoundException(key) + + # Retrieve the current override state. If the override is not active, it defaults to False. + current_override_state = matching_secret["override"].get("is_active", False) + + # Prepare the payload to update the override status. The value of the override remains unchanged, + # but the isActive status is toggled. + secret_update_payload["override"] = { + "value": matching_secret["override"]["value"], + "isActive": not current_override_state # Toggle the current state. + } + elif override: + # If the override flag is set, check if there is an existing override. + if matching_secret["override"] is None: + # If no override exists, create a new override entry. + # The value for the override is the encrypted value provided by the user, + # and the override is activated by default. + secret_update_payload["override"] = { + "value": encrypted_value, + "isActive": True + } + else: + # If an override already exists, update its value. + # If a new value is provided, use it; otherwise, retain the existing override value. + # The isActive status of the override is also retained from the existing override. + secret_update_payload["override"] = { + "value": encrypted_value if value is not None else matching_secret["override"]["value"], + "isActive": matching_secret["override"].get("is_active", True) + } + + response = update_phase_secrets(self._token_type, self._app_secret.app_token, env_id, [secret_update_payload], self._api_host) + + if response.status_code == 200: + return "Success" + else: + return f"Error: Failed to update secret. HTTP Status Code: {response.status_code}" + + + def delete(self, env_name: str, keys_to_delete: List[str], app_name: str = None, path: str = None) -> List[str]: + """ + Delete secrets in Phase KMS based on keys and environment, with optional path support. + + Args: + env_name (str): The name (or partial name) of the desired environment. + keys_to_delete (List[str]): The keys for which to delete the secrets. + app_name (str, optional): The name of the desired application. + path (str, optional): The path within which to delete the secrets. If specified, only deletes secrets within this path. + + Returns: + List[str]: A list of keys that were not found and could not be deleted. + """ + + user_response = fetch_phase_user(self._token_type, self._app_secret.app_token, self._api_host) + if user_response.status_code != 200: + raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") + + user_data = user_response.json() + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + + environment_key = self._find_matching_environment_key(user_data, env_id) + if environment_key is None: + raise ValueError(f"No environment found with id: {env_id}") + + wrapped_seed = environment_key.get("wrapped_seed") + decrypted_seed = self.decrypt(wrapped_seed) + key_pair = CryptoUtils.env_keypair(decrypted_seed) + env_private_key = key_pair['privateKey'] + + secret_ids_to_delete = [] + keys_not_found = [] + secrets_response = fetch_phase_secrets(self._token_type, self._app_secret.app_token, env_id, self._api_host, path=path) + secrets_data = secrets_response.json() + + for key in keys_to_delete: + found = False + for secret in secrets_data: + if path is not None and secret.get("path", "/") != path: + continue # Skip secrets not in the specified path + decrypted_key = CryptoUtils.decrypt_asymmetric(secret["key"], env_private_key, public_key) + if decrypted_key == key: + secret_ids_to_delete.append(secret["id"]) + found = True + break + if not found: + keys_not_found.append(key) + + if secret_ids_to_delete: + delete_phase_secrets(self._token_type, self._app_secret.app_token, env_id, secret_ids_to_delete, self._api_host) + + return keys_not_found + + + def decrypt(self, phase_ciphertext) -> str | None: + """ + Decrypts a Phase ciphertext string. + + Args: + phase_ciphertext (str): The encrypted message to decrypt. + + Returns: + str: The decrypted plaintext as a string. Returns `None` if an error occurs. + + Raises: + ValueError: If the ciphertext is not in the expected format (e.g. wrong prefix, wrong number of fields). + """ + try: + [prefix, version, client_pub_key_hex, ct] = phase_ciphertext.split(':') + if prefix != 'ph' or len(phase_ciphertext.split(':')) != 4: + raise ValueError('Ciphertext is invalid') + client_pub_key = bytes.fromhex(client_pub_key_hex) + + wrapped_key_share = fetch_wrapped_key_share( + self._token_type, self._app_secret.app_token, self._api_host) + keyshare1 = CryptoUtils.decrypt_raw(bytes.fromhex(wrapped_key_share), bytes.fromhex(self._app_secret.keyshare1_unwrap_key)).decode("utf-8") + + app_priv_key = CryptoUtils.reconstruct_secret( + [self._app_secret.keyshare0, keyshare1]) + + session_keys = crypto_kx_server_session_keys(bytes.fromhex( + self._app_secret.pss_user_public_key), bytes.fromhex(app_priv_key), client_pub_key) + + plaintext = CryptoUtils.decrypt_b64(ct, session_keys[0].hex()) + + return plaintext + + except ValueError as err: + raise ValueError(f"Something went wrong: {err}") diff --git a/src/phase/utils/secret_referencing.py b/src/phase/utils/secret_referencing.py new file mode 100644 index 0000000..cb7fb12 --- /dev/null +++ b/src/phase/utils/secret_referencing.py @@ -0,0 +1,163 @@ +import re +from typing import Dict, List +from .exceptions import EnvironmentNotFoundException +from .const import SECRET_REF_REGEX + +""" + Secret Referencing Syntax: + + This documentation explains the syntax used for referencing secrets within the configuration. + Secrets can be referenced both locally (within the same environment) and across different environments, + with or without specifying a path. + + Syntax Patterns: + + 1. Local Reference (Root Path): + Syntax: `${KEY}` + - Environment: Same as the current environment. + - Path: Root path (`/`). + - Secret Key: `KEY` + - Description: References a secret named `KEY` in the root path of the current environment. + + 2. Cross-Environment Reference (Root Path): + Syntax: `${staging.DEBUG}` + - Environment: Different environment (e.g., `staging`). + - Path: Root path (`/`) of the specified environment. + - Secret Key: `DEBUG` + - Description: References a secret named `DEBUG` in the root path of the `staging` environment. + + 3. Cross-Environment Reference (Specific Path): + Syntax: `${prod./frontend/SECRET_KEY}` + - Environment: Different environment (e.g., `prod`). + - Path: Specifies a path within the environment (`/frontend/`). + - Secret Key: `SECRET_KEY` + - Description: References a secret named `SECRET_KEY` located at `/frontend/` in the `prod` environment. + + 4. Local Reference (Specified Path): + Syntax: `${/backend/payments/STRIPE_KEY}` + - Environment: Same as the current environment. + - Path: Specifies a path within the environment (`/backend/payments/`). + - Secret Key: `STRIPE_KEY` + - Description: References a secret named `STRIPE_KEY` located at `/backend/payments/` in the current environment. + + Note: + The syntax allows for flexible secret management, enabling both straightforward local references and more complex cross-environment references. +""" + + +def split_path_and_key(ref: str) -> tuple: + """ + Splits a reference string into path and key components. + + Args: + ref (str): The reference string to split. + + Returns: + tuple: A tuple containing the path and key. + """ + last_slash_index = ref.rfind("/") + if last_slash_index != -1: + path = ref[:last_slash_index] + key_name = ref[last_slash_index + 1:] + else: + path = "/" + key_name = ref + + # Ensure path starts with a slash + if not path.startswith("/"): + path = "/" + path + + return path, key_name + + +def resolve_secret_reference(ref: str, secrets_dict: Dict[str, Dict[str, Dict[str, str]]], phase: 'Phase', current_application_name: str, current_env_name: str) -> str: + """ + Resolves a single secret reference to its actual value by fetching it from the specified environment. + + The function supports both local and cross-environment secret references, allowing for flexible secret management. + Local references are identified by the absence of a dot '.' in the reference string, implying the current environment. + Cross-environment references include an environment name, separated by a dot from the rest of the path. + + Args: + ref (str): The secret reference string, which could be a local or cross-environment reference. + secrets_dict (Dict[str, Dict[str, Dict[str, str]]]): A dictionary containing known secrets. + phase ('Phase'): An instance of the Phase class to fetch secrets. + current_application_name (str): The name of the current application. + current_env_name (str): The current environment name, used for resolving local references. + + Returns: + str: The resolved secret value or the original reference if not resolved. + """ + env_name = current_env_name + path = "/" # Default root path + key_name = ref + + # Parse the reference to identify environment, path, and secret key. + if "." in ref: # Cross-environment references + parts = ref.split(".", 1) + env_name, rest = parts[0], parts[1] + path, key_name = split_path_and_key(rest) + else: # Local reference + path, key_name = split_path_and_key(ref) + + try: + # Lookup with environment, path, and key + if env_name in secrets_dict: + # Try to find the secret in the exact path + if path in secrets_dict[env_name] and key_name in secrets_dict[env_name][path]: + return secrets_dict[env_name][path][key_name] + + # For local references, try to find the secret in the root path only if the original path was root + if env_name == current_env_name and path == "/" and '/' in secrets_dict[env_name] and key_name in secrets_dict[env_name]['/']: + return secrets_dict[env_name]['/'][key_name] + + # If the secret is not found in secrets_dict, try to fetch it from Phase + fetched_secrets = phase.get(env_name=env_name, app_name=current_application_name, keys=[key_name], path=path) + for secret in fetched_secrets: + if secret["key"] == key_name: + return secret["value"] + except EnvironmentNotFoundException: + pass + + # Return the reference as is if not resolved + return f"${{{ref}}}" + + +def resolve_all_secrets(value: str, all_secrets: List[Dict[str, str]], phase: 'Phase', current_application_name: str, current_env_name: str) -> str: + """ + Resolves all secret references within a given string to their actual values. + + This function is particularly useful for processing configuration strings or entire files that + may contain multiple secret references. It iterates through each reference found in the input string, + resolves it using `resolve_secret_reference`, and replaces the reference with the resolved value. + + Args: + value (str): The input string containing one or more secret references. + all_secrets (List[Dict[str, str]]): A list of all known secrets. + phase ('Phase'): An instance of the Phase class to fetch secrets. + current_application_name (str): The name of the current application. + current_env_name (str): The current environment name for resolving local references. + + Returns: + str: The input string with all secret references resolved to their actual values. + """ + + secrets_dict = {} + for secret in all_secrets: + env_name = secret['environment'] + path = secret['path'] + key = secret['key'] + if env_name not in secrets_dict: + secrets_dict[env_name] = {} + if path not in secrets_dict[env_name]: + secrets_dict[env_name][path] = {} + secrets_dict[env_name][path][key] = secret['value'] + + refs = SECRET_REF_REGEX.findall(value) + resolved_value = value + # Resolve each found reference and replace it with resolved_secret_value. + for ref in refs: + resolved_secret_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + resolved_value = resolved_value.replace(f"${{{ref}}}", resolved_secret_value) + + return resolved_value diff --git a/src/phase/version.py b/src/phase/version.py deleted file mode 100644 index e843230..0000000 --- a/src/phase/version.py +++ /dev/null @@ -1,2 +0,0 @@ -__version__ = "1.1.0" -__ph_version__ = "v1" diff --git a/tests/test_crypto.py b/tests/test_crypto.py new file mode 100644 index 0000000..e435ec7 --- /dev/null +++ b/tests/test_crypto.py @@ -0,0 +1,149 @@ +import pytest +import base64 +import re +import os +from src.phase.utils.crypto import CryptoUtils +from src.phase.utils.crypto import generate_random_secret +from nacl.secret import SecretBox + +class TestCryptoUtils: + def test_random_key_pair(self): + # Testing if the generated key pair (public and private keys) are of the correct length (32 bytes each) + public_key, private_key = CryptoUtils.random_key_pair() + assert len(public_key) == 32 + assert len(private_key) == 32 + + def test_client_session_keys(self): + # Testing client session keys generation by ensuring the keys are of the correct length + client_keypair = CryptoUtils.random_key_pair() + server_keypair = CryptoUtils.random_key_pair() + client_keys = CryptoUtils.client_session_keys(client_keypair, server_keypair[0]) + assert len(client_keys[0]) == 32 and len(client_keys[1]) == 32 + + def test_server_session_keys(self): + # Testing server session keys generation similar to client session keys + client_keypair = CryptoUtils.random_key_pair() + server_keypair = CryptoUtils.random_key_pair() + server_keys = CryptoUtils.server_session_keys(server_keypair, client_keypair[0]) + assert len(server_keys[0]) == 32 and len(server_keys[1]) == 32 + +def test_encrypt_and_decrypt_asymmetric(): + # Testing asymmetric encryption and decryption to ensure the decrypted text matches the original plaintext + test_plaintext = "Saigon, I'm still only in Saigon. Every time I think I'm gonna wake up back in the jungle.." + public_key, private_key = CryptoUtils.random_key_pair() + encrypted_data = CryptoUtils.encrypt_asymmetric(test_plaintext, public_key.hex()) + decrypted_data = CryptoUtils.decrypt_asymmetric(encrypted_data, private_key.hex(), public_key.hex()) + pattern = rf"ph:v{CryptoUtils.VERSION}:[0-9a-fA-F]{{64}}:.+" + assert re.match(pattern, encrypted_data) is not None + assert decrypted_data == test_plaintext + +class TestGenerateRandomSecret: + @pytest.mark.parametrize("secret_type, expected_length", [ + # Testing random secret generation for various types and lengths + ('hex', 64), ('alphanumeric', 32), ('base64', 44), ('base64url', 44), + ('key128', 24), ('key256', 44) + ]) + def test_generate_random_secret_valid_types(self, secret_type, expected_length): + secret = generate_random_secret(secret_type, 32) + assert len(secret) == expected_length + if secret_type == 'hex': + assert re.fullmatch(r'[0-9a-fA-F]+', secret) is not None + elif secret_type == 'alphanumeric': + assert re.fullmatch(r'[0-9a-zA-Z]+', secret) is not None + elif secret_type in ['base64', 'base64url']: + base64_bytes = base64.urlsafe_b64decode(secret + '==') + assert len(base64_bytes) == 32 + + def test_generate_random_secret_invalid_type(self): + # Ensuring that an invalid secret type raises a ValueError + with pytest.raises(ValueError): + generate_random_secret('invalid', 32) + + def test_generate_random_secret_invalid_length(self): + # Ensuring that an invalid length (e.g., zero) raises a ValueError + with pytest.raises(ValueError): + generate_random_secret('hex', 0) + + @pytest.mark.parametrize("key_type", ['key256']) + def test_encrypt_decrypt_with_generated_keys(self, key_type): + # Testing encryption and decryption with XChaCha20-Poly1305 using a generated key + key = generate_random_secret(key_type) + key_bytes = base64.b64decode(key) + test_plaintext = "Test message for encryption and decryption." + encrypted_text = CryptoUtils.encrypt_string(test_plaintext, key_bytes) + decrypted_plaintext = CryptoUtils.decrypt_string(encrypted_text, key_bytes) + assert decrypted_plaintext == test_plaintext + + # AES 256-bit custom encrypt and decrypt methods + @staticmethod + def encrypt_aes256(plaintext, key): + # Ensuring key length for AES 256 is 32 bytes and performing encryption + assert len(key) == 32 + secret_box = SecretBox(key) + return secret_box.encrypt(plaintext.encode()) + + @staticmethod + def decrypt_aes256(ciphertext, key): + # Ensuring key length for AES 256 is 32 bytes and performing decryption + assert len(key) == 32 + secret_box = SecretBox(key) + return secret_box.decrypt(ciphertext).decode() + + def test_aes256_encryption_decryption(self): + # Testing AES 256 encryption and decryption to ensure the decrypted text matches the original plaintext + key = generate_random_secret('key256') + key_bytes = base64.b64decode(key) + test_plaintext = "Test message for AES 256." + ciphertext = TestGenerateRandomSecret.encrypt_aes256(test_plaintext, key_bytes) + decrypted_plaintext = TestGenerateRandomSecret.decrypt_aes256(ciphertext, key_bytes) + assert decrypted_plaintext == test_plaintext + +class TestBlake2bDigest: + def test_blake2b_digest_length(self): + # Testing the length of the BLAKE2b hash to ensure it's 64 characters (32 bytes hex encoded) + input_str = "test string" + salt = "salt" + result = CryptoUtils.blake2b_digest(input_str, salt) + assert len(result) == 64 + + def test_blake2b_digest_consistency(self): + # Testing hash consistency for the same input and salt + input_str = "consistent input" + salt = "consistent salt" + hash1 = CryptoUtils.blake2b_digest(input_str, salt) + hash2 = CryptoUtils.blake2b_digest(input_str, salt) + assert hash1 == hash2 + + def test_blake2b_digest_unique_with_different_inputs(self): + # Ensuring different inputs with the same salt produce different hashes + salt = "salt" + hash1 = CryptoUtils.blake2b_digest("input1", salt) + hash2 = CryptoUtils.blake2b_digest("input2", salt) + assert hash1 != hash2 + + def test_blake2b_digest_unique_with_different_salts(self): + # Ensuring the same input with different salts produces different hashes + input_str = "input" + hash1 = CryptoUtils.blake2b_digest(input_str, "salt1") + hash2 = CryptoUtils.blake2b_digest(input_str, "salt2") + assert hash1 != hash2 + + @pytest.mark.parametrize("input_str, salt, expected_hash", [ + # Testing known hash values for specific inputs and salts + ("hello", "world", "38010cfe3a8e684cb17e6d049525e71d4e9dc3be173fc05bf5c5ca1c7e7c25e7"), + ("another test", "another salt", "5afad949edcfb22bd24baeed4e75b0aeca41731b8dff78f989a5a4c0564f211f") + ]) + def test_blake2b_digest_known_values(self, input_str, salt, expected_hash): + # Testing that the calculated hash matches the expected known hash + result = CryptoUtils.blake2b_digest(input_str, salt) + assert result == expected_hash + +class TestSecretSplitting: + def test_xor_secret_splitting_and_reconstruction(self): + # Testing XOR-based secret splitting and reconstruction + original_secret_hex = "6eed8a70ac9e75ab1894b06d4a5e21d1072649529753f3244316c6d9e4c9c951" + original_secret_bytes = bytes.fromhex(original_secret_hex) + random_share = os.urandom(len(original_secret_bytes)) + second_share = CryptoUtils.xor_bytes(original_secret_bytes, random_share) + reconstructed_secret_hex = CryptoUtils.reconstruct_secret([random_share.hex(), second_share.hex()]) + assert reconstructed_secret_hex == original_secret_hex diff --git a/tests/test_decrypt.py b/tests/test_decrypt.py deleted file mode 100644 index added1e..0000000 --- a/tests/test_decrypt.py +++ /dev/null @@ -1,40 +0,0 @@ -import pytest -from src.phase.phase import Phase - -APP_ID = "phApp:v1:e0e50cb9a1953c610126b4092093b1beca51d08d91fc3d9f8d90482a32853215" -APP_SECRET = "pss:v1:d261abecb6708c18bebdb8b2748ee574e2b0bdeaf19b081a5f10006cc83d48d0:d146c8c6d326a7842ff9b2da0da455b3f7f568a70808e2eb0cfc5143d4fe170f:59e413612e06d75d251e3416361d0743345a9c9eda1cbcf2b1ef16e3077c011c" -APP_SECRET_INVALID = "pss:v1:d251abecb6708c18bebdb8b2748ee574e2b0bdeaf19b081a5f10006cc83d48d0:d146c8c6d326a7842ff9b2da0da455b3f7f568a70808e2eb0cfc5143d4fe170d:59e413612e06d75d251e3416361d0743345a9c9eda1cbcf2b1ef16e3077c012d" - - -@pytest.fixture(scope="module") -def phase_instance(): - return Phase(APP_ID, APP_SECRET) - - -def mock_fetch_app_key(appToken, wrapKey, appId, dataSize, custom_kms_host=None): - return "e35ae9560207c90fa3dd68a8715e13a1ef988bffa284db73f04328df17f37cfe" - - -def test_phase_decrypt_returns_correct_plaintext(phase_instance, monkeypatch): - data = "Signal" - - monkeypatch.setattr("src.phase.phase.fetch_app_key", mock_fetch_app_key) - - ciphertext = phase_instance.encrypt(data) - - plaintext = phase_instance.decrypt(ciphertext) - - assert plaintext is not None - assert plaintext == data - - -def test_phase_decrypt_fails_with_incorrect_app_secret(monkeypatch): - phase = Phase(APP_ID, APP_SECRET_INVALID) - - monkeypatch.setattr("src.phase.phase.fetch_app_key", mock_fetch_app_key) - - data = "Signal" - ciphertext = phase.encrypt(data) - - with pytest.raises(ValueError, match="Something went wrong"): - phase.decrypt(ciphertext) diff --git a/tests/test_encrypt.py b/tests/test_encrypt.py deleted file mode 100644 index e45cefe..0000000 --- a/tests/test_encrypt.py +++ /dev/null @@ -1,41 +0,0 @@ -import pytest -import re -from src.phase import Phase - -APP_ID = "phApp:v1:cd2d579490fd794f1640590220de86a3676fa7979d419056bc631741b320b701" -APP_SECRET = "pss:v1:a7a0822aa4a4e4d37919009264200ba6ab978d92c8b4f7db5ae9ce0dfaf604fe:801605dfb89822ff52957abe39949bcfc44b9058ad81de58dd54fb0b110037b4b2bbde5a1143d31bbb3895f72e4ee52f5bd:625d395987f52c37022063eaf9b6260cad9ca03c99609213f899cae7f1bb04e7" - - -@pytest.fixture(scope="module") -def phase_instance(): - return Phase(APP_ID, APP_SECRET) - - -def test_phase_encrypt_returns_valid_ph(phase_instance): - plaintext = "Signal" - tag = "Phase Tag" - PH_VERSION = "v1" - - ciphertext = phase_instance.encrypt(plaintext, tag) - - assert ciphertext is not None - segments = ciphertext.split(":") - assert len(segments) == 5 - assert segments[0] == "ph" - assert segments[1] == PH_VERSION - assert segments[4] == tag - assert re.match("^[0-9a-f]+$", segments[2]) is not None - assert re.match( - "^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=$)", segments[3]) is not None - - -def test_phase_encrypt_produces_same_length_ciphertexts(phase_instance): - data = "hello world" - num_of_trials = 10 - ciphertext_lengths = set() - - for _ in range(num_of_trials): - ciphertext = phase_instance.encrypt(data) - ciphertext_lengths.add(len(ciphertext)) - - assert len(ciphertext_lengths) == 1 diff --git a/tests/test_init.py b/tests/test_init.py deleted file mode 100644 index 37bb02b..0000000 --- a/tests/test_init.py +++ /dev/null @@ -1,22 +0,0 @@ -from src.phase import Phase -import pytest - -APP_ID_INVALID = "phApp:v1:cd2d579490fd794f1640590220de86a3676fa7979d419056bc631741b320b701" -APP_SECRET_INVALID = "pss:v1:a7a0822aa4a4e4d37919009264200ba6ab978d92c8b4f7db5ae9ce0dfaf604fe:801605dfb89822ff52957abe39949bcfc44b9058ad81de58dd54fb0b110037b4b2bbde5a1143d31bbb3895f72e4ee52f5bd:625d395987f52c37022063eaf9b6260cad9ca03c99609213f899cae7f1bb04e7" - - -@pytest.fixture(scope="module") -def phase_instance(): - return Phase(APP_ID_INVALID, APP_SECRET_INVALID) - - -def test_init_fails_with_invalid_app_id(phase_instance): - invalid_app_id = "phApp:version:cd2d579490fd794f1640590220de86a3676fa7979d419056bc631741b320b701" - with pytest.raises(ValueError, match="Invalid Phase APP_ID"): - Phase(invalid_app_id, APP_SECRET_INVALID) - - -def test_test_init_fails_with_invalid_app_secret(phase_instance): - invalid_app_secret = "pss:v1:00000000000000000000000000000000:00000000000000000000000000000000:00000000000000000000000000000000" - with pytest.raises(ValueError, match="Invalid Phase APP_SECRET"): - Phase(APP_ID_INVALID, invalid_app_secret) diff --git a/tests/test_secret_referencing.py b/tests/test_secret_referencing.py new file mode 100644 index 0000000..efd6a95 --- /dev/null +++ b/tests/test_secret_referencing.py @@ -0,0 +1,118 @@ +import pytest +from unittest.mock import Mock, patch +from src.phase.utils.secret_referencing import resolve_secret_reference, resolve_all_secrets +from src.phase.utils.exceptions import EnvironmentNotFoundException +from src.phase.utils.const import SECRET_REF_REGEX + +# Mock data for secrets +secrets_dict = { + "current": { + "/": { + "KEY": "value1" + }, + "/backend/payments": { + "STRIPE_KEY": "stripe_value" + } + }, + "staging": { + "/": { + "DEBUG": "staging_debug_value" + } + }, + "prod": { + "/frontend": { + "SECRET_KEY": "prod_secret_value" + } + } +} + +# Mock Phase class +class MockPhase: + def get(self, env_name, app_name, keys, path): + if env_name == "prod" and path == "/frontend": + return [{"key": "SECRET_KEY", "value": "prod_secret_value"}] + raise EnvironmentNotFoundException(env_name=env_name) + +@pytest.fixture +def phase(): + return MockPhase() + +@pytest.fixture +def current_env_name(): + return "current" + +@pytest.fixture +def current_application_name(): + return "test_app" + +def test_resolve_local_reference_root(phase, current_application_name, current_env_name): + ref = "KEY" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "value1" + +def test_resolve_local_reference_path(phase, current_application_name, current_env_name): + ref = "/backend/payments/STRIPE_KEY" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "stripe_value" + +def test_resolve_cross_environment_root(phase, current_application_name, current_env_name): + ref = "staging.DEBUG" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "staging_debug_value" + +def test_resolve_cross_environment_path(phase, current_application_name, current_env_name): + ref = "prod./frontend/SECRET_KEY" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "prod_secret_value" + +def test_resolve_all_secrets(phase, current_application_name, current_env_name): + value = "Use this key: ${KEY}, and this staging key: ${staging.DEBUG}, and this path key: ${/backend/payments/STRIPE_KEY}" + all_secrets = [ + {"environment": "current", "path": "/", "key": "KEY", "value": "value1"}, + {"environment": "staging", "path": "/", "key": "DEBUG", "value": "staging_debug_value"}, + {"environment": "current", "path": "/backend/payments", "key": "STRIPE_KEY", "value": "stripe_value"} + ] + resolved_value = resolve_all_secrets(value, all_secrets, phase, current_application_name, current_env_name) + expected_value = "Use this key: value1, and this staging key: staging_debug_value, and this path key: stripe_value" + assert resolved_value == expected_value + +# Edge Case: Missing key in the current environment +def test_resolve_missing_local_key(phase, current_application_name, current_env_name): + ref = "MISSING_KEY" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "${MISSING_KEY}" + +# Edge Case: Missing key in a cross environment reference +def test_resolve_missing_cross_env_key(phase, current_application_name, current_env_name): + ref = "prod.MISSING_KEY" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "${prod.MISSING_KEY}" + +# Edge Case: Missing path in a cross environment reference +def test_resolve_missing_cross_env_path(phase, current_application_name, current_env_name): + ref = "prod./missing_path/SECRET_KEY" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "${prod./missing_path/SECRET_KEY}" + +# Complex Case: Mixed references with missing values +def test_resolve_mixed_references_with_missing(phase, current_application_name, current_env_name): + value = "Local: ${KEY}, Missing Local: ${MISSING_KEY}, Cross: ${staging.DEBUG}, Missing Cross: ${prod.MISSING_KEY}" + all_secrets = [ + {"environment": "current", "path": "/", "key": "KEY", "value": "value1"}, + {"environment": "staging", "path": "/", "key": "DEBUG", "value": "staging_debug_value"} + ] + resolved_value = resolve_all_secrets(value, all_secrets, phase, current_application_name, current_env_name) + expected_value = "Local: value1, Missing Local: ${MISSING_KEY}, Cross: staging_debug_value, Missing Cross: ${prod.MISSING_KEY}" + assert resolved_value == expected_value + +# Edge Case: Local reference with missing path +def test_resolve_local_reference_missing_path(phase, current_application_name, current_env_name): + ref = "/missing_path/KEY" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "${/missing_path/KEY}" + +# Edge Case: Invalid reference format +def test_resolve_invalid_reference_format(phase, current_application_name, current_env_name): + ref = "invalid_format" + resolved_value = resolve_secret_reference(ref, secrets_dict, phase, current_application_name, current_env_name) + assert resolved_value == "${invalid_format}" \ No newline at end of file