diff --git a/backend/api/utils/crypto.py b/backend/api/utils/crypto.py index bea354c5e..f59757375 100644 --- a/backend/api/utils/crypto.py +++ b/backend/api/utils/crypto.py @@ -1,3 +1,4 @@ +import re from nacl.hash import blake2b from nacl.utils import random from base64 import b64encode, b64decode @@ -16,6 +17,7 @@ from typing import Tuple from typing import List +PREFIX = "ph" VERSION = 1 @@ -69,7 +71,7 @@ def encrypt_asymmetric(plaintext, public_key_hex): ciphertext = encrypt_string(plaintext, symmetric_keys[1]) - return f"ph:v{VERSION}:{public_key.hex()}:{ciphertext}" + return f"{PREFIX}:v{VERSION}:{public_key.hex()}:{ciphertext}" def decrypt_asymmetric(ciphertext_string, private_key_hex, public_key_hex): @@ -198,3 +200,33 @@ def blake2b_digest(input_str: str, salt: str) -> str: ) hex_encoded = hashed.hex() return hex_encoded + + +def validate_encrypted_string(encrypted_string): + """ + Validates if the given string matches the phase encrypted data format. + + The expected format is: `ph:v1::` + where: + - `ph` is the fixed prefix. + - `v1` is the fixed version. + - `` is a hexadecimal string. + - `` is a Base64-encoded string. + + Parameters: + encrypted_string (str): The encrypted string to validate. + + Returns: + bool: True if the string matches the expected format, False otherwise. + """ + if encrypted_string: + # Define the regular expression pattern for an encrypted string + pattern = re.compile(f"ph:v{VERSION}:[0-9a-fA-F]{{64}}:.+") + + # Match the string against the pattern + match = re.match(pattern, encrypted_string) + + # Return True if it matches, otherwise False + return bool(match) + + return True diff --git a/backend/api/views/secrets.py b/backend/api/views/secrets.py index 5f98bf08c..e684eb8a1 100644 --- a/backend/api/views/secrets.py +++ b/backend/api/views/secrets.py @@ -20,7 +20,7 @@ from api.utils.permissions import user_can_access_environment from api.utils.audit_logging import log_secret_event -from api.utils.crypto import encrypt_asymmetric +from api.utils.crypto import encrypt_asymmetric, validate_encrypted_string from api.utils.rest import ( get_resolver_request_meta, ) @@ -107,6 +107,18 @@ def post(self, request): return JsonResponse({"error": "Duplicate secret found"}, status=409) for secret in request_body["secrets"]: + + # Check that all encrypted fields are valid + encrypted_fields = [secret["key"], secret["value"], secret["comment"]] + if "override" in secret: + encrypted_fields.append(secret["override"]["value"]) + + for encrypted_field in encrypted_fields: + if not validate_encrypted_string(encrypted_field): + return JsonResponse( + {"error": "Invalid ciphertext format"}, status=400 + ) + tags = SecretTag.objects.filter(id__in=secret["tags"]) try: @@ -169,6 +181,18 @@ def put(self, request): return JsonResponse({"error": "Duplicate secret found"}, status=409) for secret in request_body["secrets"]: + + # Check that all encrypted fields are valid + encrypted_fields = [secret["key"], secret["value"], secret["comment"]] + if "override" in secret: + encrypted_fields.append(secret["override"]["value"]) + + for encrypted_field in encrypted_fields: + if not validate_encrypted_string(encrypted_field): + return JsonResponse( + {"error": "Invalid ciphertext format"}, status=400 + ) + secret_obj = Secret.objects.get(id=secret["id"]) tags = SecretTag.objects.filter(id__in=secret["tags"])