diff --git a/README.md b/README.md index 76df54e..f108e48 100644 --- a/README.md +++ b/README.md @@ -111,21 +111,6 @@ result = phase.delete_secret(delete_options) print(f"Delete result: {result}") ``` -### Resolve Secret References - -Resolve references in secret values: - -```python -get_options = GetAllSecretsOptions( - env_name="Development", - app_name="Your App Name" -) -secrets = phase.get_all_secrets(get_options) -resolved_secrets = phase.resolve_references(secrets, "Development", "Your App Name") -for secret in resolved_secrets: - print(f"Key: {secret.key}, Resolved Value: {secret.value}") -``` - ## Error Handling The SDK methods may raise exceptions for various error conditions. It's recommended to wrap SDK calls in try-except blocks to handle potential errors: diff --git a/pyproject.toml b/pyproject.toml index badd3cf..a917c55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "phase_dev" -version = "2.0.1" +version = "2.1.0" description = "Python SDK for Phase secrets manager" readme = "README.md" requires-python = ">=3.10" diff --git a/src/phase/phase.py b/src/phase/phase.py index 602fa68..27c33b0 100644 --- a/src/phase/phase.py +++ b/src/phase/phase.py @@ -6,43 +6,68 @@ @dataclass class GetSecretOptions: env_name: str - app_name: str + app_name: Optional[str] = None + app_id: Optional[str] = None key_to_find: Optional[str] = None tag: Optional[str] = None secret_path: str = "/" + def __post_init__(self): + if not self.app_name and not self.app_id: + raise ValueError("Either app_name or app_id must be provided") + @dataclass class GetAllSecretsOptions: env_name: str - app_name: str + app_name: Optional[str] = None + app_id: Optional[str] = None tag: Optional[str] = None secret_path: str = "/" + def __post_init__(self): + if not self.app_name and not self.app_id: + raise ValueError("Either app_name or app_id must be provided") + @dataclass class CreateSecretsOptions: env_name: str - app_name: str key_value_pairs: List[Dict[str, str]] + app_name: Optional[str] = None + app_id: Optional[str] = None secret_path: str = "/" + def __post_init__(self): + if not self.app_name and not self.app_id: + raise ValueError("Either app_name or app_id must be provided") + @dataclass class UpdateSecretOptions: env_name: str - app_name: str key: str value: Optional[str] = None + app_name: Optional[str] = None + app_id: Optional[str] = None secret_path: str = "/" destination_path: Optional[str] = None override: bool = False toggle_override: bool = False + def __post_init__(self): + if not self.app_name and not self.app_id: + raise ValueError("Either app_name or app_id must be provided") + @dataclass class DeleteSecretOptions: env_name: str - app_name: str key_to_delete: str + app_name: Optional[str] = None + app_id: Optional[str] = None secret_path: str = "/" + def __post_init__(self): + if not self.app_name and not self.app_id: + raise ValueError("Either app_name or app_id must be provided") + @dataclass class PhaseSecret: key: str @@ -51,49 +76,127 @@ class PhaseSecret: path: str = "/" tags: List[str] = field(default_factory=list) overridden: bool = False + application: Optional[str] = None + environment: Optional[str] = None class Phase: def __init__(self, init=True, pss=None, host=None): self._phase_io = PhaseIO(init=init, pss=pss, host=host) + def _resolve_secret_values(self, secrets: List[PhaseSecret], env_name: str, app_name: str) -> List[PhaseSecret]: + """ + Utility function to resolve secret references within secret values. + + Args: + secrets (List[PhaseSecret]): List of secrets to process + env_name (str): Environment name for secret resolution + app_name (str): Application name for secret resolution + + Returns: + List[PhaseSecret]: List of secrets with resolved values + """ + # Convert PhaseSecret objects to dict format expected by resolve_all_secrets + all_secrets = [ + { + 'environment': secret.environment or env_name, + 'path': secret.path, + 'key': secret.key, + 'value': secret.value + } + for secret in secrets + ] + + # Create new list of secrets with resolved values + resolved_secrets = [] + for secret in secrets: + resolved_value = resolve_all_secrets( + value=secret.value, + all_secrets=all_secrets, + phase=self._phase_io, + current_application_name=secret.application or app_name, + current_env_name=secret.environment or env_name + ) + + resolved_secrets.append(PhaseSecret( + key=secret.key, + value=resolved_value, + comment=secret.comment, + path=secret.path, + tags=secret.tags, + overridden=secret.overridden, + application=secret.application, + environment=secret.environment + )) + + return resolved_secrets + def get_secret(self, options: GetSecretOptions) -> Optional[PhaseSecret]: secrets = self._phase_io.get( env_name=options.env_name, keys=[options.key_to_find] if options.key_to_find else None, app_name=options.app_name, + app_id=options.app_id, tag=options.tag, path=options.secret_path ) if secrets: secret = secrets[0] - return PhaseSecret( + phase_secret = PhaseSecret( key=secret['key'], value=secret['value'], comment=secret.get('comment', ''), path=secret.get('path', '/'), tags=secret.get('tags', []), - overridden=secret.get('overridden', False) + overridden=secret.get('overridden', False), + application=secret.get('application'), + environment=secret.get('environment') + ) + + # Resolve any secret references in the value + resolved_secrets = self._resolve_secret_values( + [phase_secret], + options.env_name, + secret.get('application', options.app_name) ) + + return resolved_secrets[0] if resolved_secrets else None return None def get_all_secrets(self, options: GetAllSecretsOptions) -> List[PhaseSecret]: secrets = self._phase_io.get( env_name=options.env_name, app_name=options.app_name, + app_id=options.app_id, tag=options.tag, path=options.secret_path ) - return [ + + if not secrets: + return [] + + # Get the application name from the first secret + app_name = secrets[0].get('application', options.app_name) + + phase_secrets = [ PhaseSecret( key=secret['key'], value=secret['value'], comment=secret.get('comment', ''), path=secret.get('path', '/'), tags=secret.get('tags', []), - overridden=secret.get('overridden', False) + overridden=secret.get('overridden', False), + application=secret.get('application'), + environment=secret.get('environment') ) for secret in secrets ] + + # Resolve any secret references in the values + return self._resolve_secret_values( + phase_secrets, + options.env_name, + app_name + ) def create_secrets(self, options: CreateSecretsOptions) -> str: # Convert the list of dictionaries to a list of tuples @@ -103,6 +206,7 @@ def create_secrets(self, options: CreateSecretsOptions) -> str: key_value_pairs=key_value_tuples, env_name=options.env_name, app_name=options.app_name, + app_id=options.app_id, path=options.secret_path ) return "Success" if response.status_code == 200 else f"Error: {response.status_code}" @@ -113,6 +217,7 @@ def update_secret(self, options: UpdateSecretOptions) -> str: key=options.key, value=options.value, app_name=options.app_name, + app_id=options.app_id, source_path=options.secret_path, destination_path=options.destination_path, override=options.override, @@ -124,29 +229,6 @@ def delete_secret(self, options: DeleteSecretOptions) -> List[str]: env_name=options.env_name, keys_to_delete=[options.key_to_delete], app_name=options.app_name, + app_id=options.app_id, path=options.secret_path ) - - def resolve_references(self, secrets: List[PhaseSecret], env_name: str, app_name: str) -> List[PhaseSecret]: - all_secrets = [ - { - 'environment': env_name, - 'application': app_name, - 'key': secret.key, - 'value': secret.value, - 'path': secret.path - } - for secret in secrets - ] - - for secret in secrets: - resolved_value = resolve_all_secrets( - secret.value, - all_secrets, - self._phase_io, - app_name, - env_name - ) - secret.value = resolved_value - - return secrets \ No newline at end of file diff --git a/src/phase/utils/const.py b/src/phase/utils/const.py index 71c8fae..95410f1 100644 --- a/src/phase/utils/const.py +++ b/src/phase/utils/const.py @@ -1,7 +1,7 @@ import os import re -__version__ = "2.0.1" +__version__ = "2.1.0" __ph_version__ = "v1" diff --git a/src/phase/utils/misc.py b/src/phase/utils/misc.py index cd5c4c6..0b3e2f6 100644 --- a/src/phase/utils/misc.py +++ b/src/phase/utils/misc.py @@ -42,7 +42,7 @@ def get_default_user_token() -> str: raise ValueError("Default user not found in the config file.") -def phase_get_context(user_data, app_name=None, env_name=None): +def phase_get_context(user_data, app_name=None, env_name=None, app_id=None): """ Get the context (ID, name, and publicKey) for a specified application and environment or the default application and environment. @@ -50,6 +50,7 @@ def phase_get_context(user_data, app_name=None, env_name=None): - user_data (dict): The user data from the API response. - app_name (str, optional): The name (or partial name) of the desired application. - env_name (str, optional): The name (or partial name) of the desired environment. + - app_id (str, optional): The explicit application ID to use. Takes precedence over app_name if both are provided. Returns: - tuple: A tuple containing the application's name, application's ID, environment's name, environment's ID, and publicKey. @@ -57,29 +58,27 @@ def phase_get_context(user_data, app_name=None, env_name=None): Raises: - ValueError: If no matching application or environment is found. """ - - # 2. If env_name isn't explicitly provided, use the default + # 1. Set default environment name default_env_name = "Development" - app_id = None env_name = env_name or default_env_name - # 3. Match the application using app_id or find the best match for partial app_name + # 2. Match the application using app_id first, then fall back to app_name if app_id is not provided try: - if app_name: + if app_id: # app_id takes precedence + application = next((app for app in user_data["apps"] if app["id"] == app_id), None) + if not application: + raise ValueError(f"🔍 No application found with ID: '{app_id}'.") + elif app_name: # only check app_name if app_id is not provided matching_apps = [app for app in user_data["apps"] if app_name.lower() in app["name"].lower()] if not matching_apps: raise ValueError(f"🔍 No application found with the name '{app_name}'.") # Sort matching applications by the length of their names, shorter names are likely to be more specific matches matching_apps.sort(key=lambda app: len(app["name"])) application = matching_apps[0] - elif app_id: - application = next((app for app in user_data["apps"] if app["id"] == app_id), None) - if not application: - raise ValueError(f"🔍 No application found with the name '{app_name_from_config}' and ID: '{app_id}'.") else: - raise ValueError("🤔 No application context provided. Please run 'phase init' or pass the '--app' flag followed by your application name.") + raise ValueError("🤔 No application context provided. Please provide either app_name or app_id.") - # 4. Attempt to match environment with the exact name or a name that contains the env_name string + # 3. Attempt to match environment with the exact name or a name that contains the env_name string environment = next((env for env in application["environment_keys"] if env_name.lower() in env["environment"]["name"].lower()), None) if not environment: diff --git a/src/phase/utils/network.py b/src/phase/utils/network.py index 0e4872b..59c6fa2 100644 --- a/src/phase/utils/network.py +++ b/src/phase/utils/network.py @@ -77,7 +77,7 @@ def construct_http_headers(token_type: str, app_token: str) -> Dict[str, str]: Dict[str, str]: The common headers including User-Agent. """ return { - "Authorization": f"Bearer {token_type.capitalize()} {app_token}", + "Authorization": f"Bearer {token_type} {app_token}", "User-Agent": get_user_agent() } diff --git a/src/phase/utils/phase_io.py b/src/phase/utils/phase_io.py index 024f392..ccd5d6e 100644 --- a/src/phase/utils/phase_io.py +++ b/src/phase/utils/phase_io.py @@ -58,11 +58,16 @@ def __init__(self, init=True, pss=None, host=None): token_type = "service token" if "pss_service" in app_secret else "user token" raise ValueError(f"Invalid Phase {token_type}") - # Storing the token type as a string for easier access - self._token_type = "service" if self.is_service_token else "user" - + # Store token segments pss_segments = app_secret.split(':') self._app_secret = AppSecret(*pss_segments) + + # If type service_token && version == 2; set token header as ServiceAccount + if self.is_service_token and self._app_secret.pes_version == "v2": + self._token_type = "ServiceAccount" + # Else decide between User token or legacy service token header + else: + self._token_type = "Service" if self.is_service_token else "User" def _find_matching_environment_key(self, user_data, env_id): @@ -95,14 +100,15 @@ def init(self): return response.json() - def create(self, key_value_pairs: List[Tuple[str, str]], env_name: str, app_name: str, path: str = '/', override_value: Optional[str] = None) -> requests.Response: + def create(self, key_value_pairs: List[Tuple[str, str]], env_name: str, app_name: Optional[str] = None, app_id: Optional[str] = None, path: str = '/', override_value: Optional[str] = None) -> requests.Response: """ Create secrets in Phase KMS with support for specifying a path and overrides. Args: key_value_pairs (List[Tuple[str, str]]): List of tuples where each tuple contains a key and a value. env_name (str): The name (or partial name) of the desired environment. - app_name (str): The name of the application context. + app_name (str, optional): The name of the application context. + app_id (str, optional): The ID of the application. Takes precedence over app_name if both are provided. path (str, optional): The path under which to store the secrets. Defaults to the root path '/'. override_value (str, optional): The overridden value for the secret. Defaults to None. @@ -114,7 +120,7 @@ def create(self, key_value_pairs: List[Tuple[str, str]], env_name: str, app_name raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") user_data = user_response.json() - app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name, app_id=app_id) environment_key = self._find_matching_environment_key(user_data, env_id) if environment_key is None: @@ -150,7 +156,7 @@ def create(self, key_value_pairs: List[Tuple[str, str]], env_name: str, app_name return create_phase_secrets(self._token_type, self._app_secret.app_token, env_id, secrets, self._api_host) - def get(self, env_name: str, keys: List[str] = None, app_name: Optional[str] = None, tag: Optional[str] = None, path: str = '') -> List[Dict]: + def get(self, env_name: str, keys: List[str] = None, app_name: Optional[str] = None, app_id: Optional[str] = None, tag: Optional[str] = None, path: str = '') -> List[Dict]: """ Get secrets from Phase KMS based on key and environment, with support for personal overrides, optional tag matching, decrypting comments, and now including path support and key digest optimization. @@ -159,6 +165,7 @@ def get(self, env_name: str, keys: List[str] = None, app_name: Optional[str] = N env_name (str): The name (or partial name) of the desired environment. keys (List[str], optional): The keys for which to retrieve the secret values. app_name (str, optional): The name of the desired application. + app_id (str, optional): The ID of the application. Takes precedence over app_name if both are provided. tag (str, optional): The tag to match against the secrets. path (str, optional): The path under which to fetch secrets, default is root. @@ -171,7 +178,7 @@ def get(self, env_name: str, keys: List[str] = None, app_name: Optional[str] = N raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") user_data = user_response.json() - app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name, app_id=app_id) environment_key = self._find_matching_environment_key(user_data, env_id) if environment_key is None: @@ -231,7 +238,7 @@ def get(self, env_name: str, keys: List[str] = None, app_name: Optional[str] = N return results - def update(self, env_name: str, key: str, value: Optional[str] = None, app_name: Optional[str] = None, source_path: str = '', destination_path: Optional[str] = None, override: bool = False, toggle_override: bool = False) -> str: + def update(self, env_name: str, key: str, value: Optional[str] = None, app_name: Optional[str] = None, app_id: Optional[str] = None, source_path: str = '', destination_path: Optional[str] = None, override: bool = False, toggle_override: bool = False) -> str: """ Update a secret in Phase KMS based on key and environment, with support for source and destination paths. @@ -240,8 +247,9 @@ def update(self, env_name: str, key: str, value: Optional[str] = None, app_name: key (str): The key for which to update the secret value. value (str, optional): The new value for the secret. Defaults to None. app_name (str, optional): The name of the desired application. + app_id (str, optional): The ID of the application. Takes precedence over app_name if both are provided. source_path (str, optional): The current path of the secret. Defaults to root path '/'. - destination_path (str, optional): The new path for the secret, if changing its location. If not provided, the path is not updated. + destination_path (str, optional): The new path for the secret, if changing its location. override (bool, optional): Whether to update an overridden secret value. Defaults to False. toggle_override (bool, optional): Whether to toggle the override state between active and inactive. Defaults to False. @@ -254,7 +262,7 @@ def update(self, env_name: str, key: str, value: Optional[str] = None, app_name: raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") user_data = user_response.json() - app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name, app_id=app_id) environment_key = self._find_matching_environment_key(user_data, env_id) if environment_key is None: @@ -338,7 +346,7 @@ def update(self, env_name: str, key: str, value: Optional[str] = None, app_name: return f"Error: Failed to update secret. HTTP Status Code: {response.status_code}" - def delete(self, env_name: str, keys_to_delete: List[str], app_name: Optional[str] = None, path: Optional[str] = None) -> List[str]: + def delete(self, env_name: str, keys_to_delete: List[str], app_name: Optional[str] = None, app_id: Optional[str] = None, path: Optional[str] = None) -> List[str]: """ Delete secrets in Phase KMS based on keys and environment, with optional path support. @@ -346,6 +354,7 @@ def delete(self, env_name: str, keys_to_delete: List[str], app_name: Optional[st env_name (str): The name (or partial name) of the desired environment. keys_to_delete (List[str]): The keys for which to delete the secrets. app_name (str, optional): The name of the desired application. + app_id (str, optional): The ID of the application. Takes precedence over app_name if both are provided. path (str, optional): The path within which to delete the secrets. If specified, only deletes secrets within this path. Returns: @@ -357,7 +366,7 @@ def delete(self, env_name: str, keys_to_delete: List[str], app_name: Optional[st raise ValueError(f"Request failed with status code {user_response.status_code}: {user_response.text}") user_data = user_response.json() - app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name) + app_name, app_id, env_name, env_id, public_key = phase_get_context(user_data, app_name=app_name, env_name=env_name, app_id=app_id) environment_key = self._find_matching_environment_key(user_data, env_id) if environment_key is None: @@ -387,8 +396,9 @@ def delete(self, env_name: str, keys_to_delete: List[str], app_name: Optional[st keys_not_found.append(key) if secret_ids_to_delete: - delete_phase_secrets(self._token_type, self._app_secret.app_token, env_id, secret_ids_to_delete, self._api_host) - + delete_response = delete_phase_secrets(self._token_type, self._app_secret.app_token, env_id, secret_ids_to_delete, self._api_host) + if delete_response.status_code != 200: + raise ValueError(f"Failed to delete secrets. Status code: {delete_response.status_code}") return keys_not_found