From 8a6693f6abc181892c564c95a9a43a9453e6b7a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Hebei=C3=9F?= Date: Wed, 12 Jun 2024 15:17:39 +0000 Subject: [PATCH 1/2] Implemented folder lookup functionality --- conftest.py | 1 + delinea/secrets/server.py | 190 ++++++++++++++++++++++++++++++++++++++ tests/test_server.py | 20 +++- 3 files changed, 210 insertions(+), 1 deletion(-) diff --git a/conftest.py b/conftest.py index 8397600..445aa8a 100644 --- a/conftest.py +++ b/conftest.py @@ -15,6 +15,7 @@ def env_vars(): "secret_id": os.getenv("TSS_SECRET_ID"), "secret_path": os.getenv("TSS_SECRET_PATH"), "folder_id": os.getenv("TSS_FOLDER_ID"), + "folder_path": os.getenv("TSS_FOLDER_PATH") } diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 9766841..792d130 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -94,6 +94,57 @@ def __init__(self, **kwargs): } +@dataclass +class ServerFolder: + # Based on https://gist.github.com/jaytaylor/3660565 + @staticmethod + def snake_case(camel_cased): + """Transform to snake case + + Transforms the keys of the given map from camelCase to snake_case. + """ + return [ + ( + re.compile("([a-z0-9])([A-Z])") + .sub(r"\1_\2", re.compile(r"(.)([A-Z][a-z]+)").sub(r"\1_\2", k)) + .lower(), + v, + ) + for (k, v) in camel_cased.items() + ] + + @dataclass + class Field: + item_id: int + value: str + slug: str + + def __init__(self, **kwargs): + # The REST API returns attributes with camelCase names which we + # replace with snake_case per Python conventions + for k, v in ServerSecret.snake_case(kwargs): + if k == "item_value": + k = "value" + setattr(self, k, v) + + id: int + folder_name: str + folder_path: str + parent_folder_id: int + folder_type_id: int + secret_policy_id: int + inherit_secret_policy: bool + inherit_permissions: bool + child_folders: list + secret_templates: list + + def __init__(self, **kwargs): + # The REST API returns attributes with camelCase names which we replace + # with snake_case per Python conventions + for k, v in self.snake_case(kwargs): + setattr(self, k, v) + + class SecretServerError(Exception): """An Exception that includes a message and the server response""" @@ -311,6 +362,36 @@ def get_secret_json(self, id, query_params=None): ) ).text + def get_folder_json(self, id, query_params=None, get_all_children=True): + """Gets a Folder from Secret Server + + :param id: the id of the folder + :type id: int + :param query_params: query parameters to pass to the endpoint + :type query_params: dict + :return: a JSON formatted string representation of the folder + :rtype: ``str`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the folder + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + endpoint_url = f"{self.api_url}/folders/{id}" + + if get_all_children: + query_params["getAllChildren"] = "true" + + if query_params is None: + return self.process(requests.get(endpoint_url, headers=self.headers())).text + else: + return self.process( + requests.get( + endpoint_url, + params=query_params, + headers=self.headers(), + ) + ).text + def get_secret(self, id, fetch_file_attachments=True, query_params=None): """Gets a secret @@ -355,6 +436,34 @@ def get_secret(self, id, fetch_file_attachments=True, query_params=None): ) return secret + def get_folder(self, id, query_params=None, get_all_children=False): + """Gets a folder + + :param id: the id of the folder + :type id: int + :param getAllChildren: Whether to retrieve all child folders of the requested folder + :type fetch_file_attachments: bool + :param query_params: query parameters to pass to the endpoint + :type query_params: dict + :return: a ``dict`` representation of the folder + :rtype: ``dict`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the folder + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + + response = self.get_folder_json( + id, query_params=query_params, get_all_children=get_all_children + ) + + try: + folder = json.loads(response) + except json.JSONDecodeError: + raise SecretServerError(response) + + return folder + def get_secret_by_path(self, secret_path, fetch_file_attachments=True): """Gets a secret by path @@ -376,6 +485,23 @@ def get_secret_by_path(self, secret_path, fetch_file_attachments=True): query_params=params, ) + def get_folder_by_path(self, folder_path, get_all_children=True): + """Gets a folder by path + + :param folder_path: full path of the folder + :type folder_path: str + :return: a ``dict`` representation of the folder + :rtype: ``dict`` + """ + path = "\\" + re.sub(r"[\\/]+", r"\\", folder_path).lstrip("\\").rstrip("\\") + + params = {"folderPath": path} + return self.get_folder( + id=0, + get_all_children=get_all_children, + query_params=params, + ) + def search_secrets(self, query_params=None): """Get Secrets from Secret Server @@ -401,6 +527,31 @@ def search_secrets(self, query_params=None): ) ).text + def lookup_folders(self, query_params=None): + """Lookup Folders from Secret Server + + :param query_params: query parameters to pass to the endpoint + :type query_params: dict + :return: a JSON formatted string representation of the folders, containing only id and name + :rtype: ``str`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the secret + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + endpoint_url = f"{self.api_url}/folders/lookup" + + if query_params is None: + return self.process(requests.get(endpoint_url, headers=self.headers())).text + else: + return self.process( + requests.get( + endpoint_url, + params=query_params, + headers=self.headers(), + ) + ).text + def get_secret_ids_by_folderid(self, folder_id): """Gets a list of secrets ids by folder_id @@ -432,6 +583,45 @@ def get_secret_ids_by_folderid(self, folder_id): return secret_ids + def get_child_folder_ids_by_folderid(self, folder_id): + """Gets a list of child folder ids by folder_id + :param folder_id: the id of the folder + :type id: int + :return: a ``list`` of the child folder id's + :rtype: ``list`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the secret + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + + params = { + "filter.parentFolderId": folder_id, + "filter.limitToDirectDescendents": True, + } + params["take"] = 1 + endpoint_url = f"{self.api_url}/folders/lookup" + + params["take"] = self.process( + requests.get(endpoint_url, params=params, headers=self.headers()) + ).json()["total"] + # Handle result of zero child folders + if params["take"] != 0: + response = self.lookup_folders(query_params=params) + + try: + response = json.loads(response) + except json.JSONDecodeError: + raise SecretServerError(response) + + child_folder_ids = [] + for childFolder in response["records"]: + child_folder_ids.append(childFolder["id"]) + + return child_folder_ids + else: + return [] + class SecretServerV0(SecretServer): """A class that uses an *OAuth2 Bearer Token* to access the Secret Server diff --git a/tests/test_server.py b/tests/test_server.py index 96b8eff..af900a7 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -6,6 +6,7 @@ SecretServerClientError, SecretServerError, ServerSecret, + ServerFolder, ) @@ -31,7 +32,6 @@ def test_api_url(secret_server, env_vars): == f"https://{env_vars['tenant']}.secretservercloud.com/api/v1" ) - def test_access_token_authorizer(env_vars, authorizer): assert SecretServer( f"https://{env_vars['tenant']}.secretservercloud.com/", @@ -51,10 +51,28 @@ def test_server_secret_by_path(env_vars, secret_server): ).id == int(env_vars["secret_id"]) +def test_server_folder_by_path(env_vars, secret_server): + assert ServerFolder( + **secret_server.get_folder_by_path(env_vars["folder_path"]) + ).id == int(env_vars["folder_id"]) + + def test_nonexistent_secret(secret_server): with pytest.raises(SecretServerClientError): secret_server.get_secret(1000) +def test_nonexistent_folder(secret_server): + with pytest.raises(SecretServerClientError): + secret_server.get_folder(1000) + + def test_server_secret_ids_by_folderid(env_vars, secret_server): assert type(secret_server.get_secret_ids_by_folderid(env_vars["folder_id"])) is list + + +def test_server_child_folder_ids_by_folderid(env_vars, secret_server): + assert ( + type(secret_server.get_child_folder_ids_by_folderid(env_vars["folder_id"])) + is list + ) From 6cdbdd27794f2cf0e71ff47cad68b48d868eac22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Hebei=C3=9F?= Date: Wed, 12 Jun 2024 15:19:01 +0000 Subject: [PATCH 2/2] updated readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index ddcabd5..0ee3043 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ export TSS_TENANT=mytenant export TSS_SECRET_ID=42 export TSS_SECRET_PATH=\Test Secrets\SecretName export TSS_FOLDER_ID=1 +export TSS_FOLDER_PATH=\Test Secrets ``` The tests assume that the user associated with the specified `TSS_USERNAME` and `TSS_PASSWORD` can read the secret to be fetched, and that the Secret itself contains `username` and `password` fields.