From 5ec4144ecec21722fda54fda9c12c1bc1477503f Mon Sep 17 00:00:00 2001 From: delinea-rajani <164006534+delinea-rajani@users.noreply.github.com> Date: Mon, 1 Jul 2024 13:11:31 -0500 Subject: [PATCH] Update server.py --- delinea/secrets/server.py | 259 +++++++++++++++++++++++++++----------- 1 file changed, 184 insertions(+), 75 deletions(-) diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 9766841..92b3a11 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -1,35 +1,28 @@ +ile line number Diff line number Diff line change """The Delinea Secret Server SDK API facilitates access to the Secret Server REST API using *OAuth2 Bearer Token* authentication. - Example: - # connect to Secret Server secret_server = SecretServer(base_url, authorizer, api_path_uri='/api/v1') # or, for Secret Server Cloud secret_server = SecretServerCloud(tenant, authorizer, tld='com') - # to get the secret as a ``dict`` secret = secret_server.get_secret(123) # or, to use the dataclass secret = ServerSecret(**secret_server.get_secret(123)) """ - import json import re from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timedelta - import requests - - @dataclass class ServerSecret: # Based on https://gist.github.com/jaytaylor/3660565 @staticmethod def snake_case(camel_cased): """Transform to snake case - Transforms the keys of the given map from camelCase to snake_case. """ return [ @@ -41,7 +34,6 @@ def snake_case(camel_cased): ) for (k, v) in camel_cased.items() ] - @dataclass class Field: item_id: int @@ -52,7 +44,6 @@ class Field: filename: str value: str slug: str - def __init__(self, **kwargs): # The REST API returns attributes with camelCase names which we # replace with snake_case per Python conventions @@ -60,7 +51,6 @@ def __init__(self, **kwargs): if k == "item_value": k = "value" setattr(self, k, v) - id: int folder_id: int secret_template_id: int @@ -74,9 +64,7 @@ def __init__(self, **kwargs): last_heart_beat_check: datetime last_password_change_attempt: datetime fields: dict - DEFAULT_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S" - def __init__(self, **kwargs): # The REST API returns attributes with camelCase names which we replace # with snake_case per Python conventions @@ -94,94 +82,119 @@ def __init__(self, **kwargs): } +@dataclass +class ServerFolder: + # Based on https://gist.github.com/jaytaylor/3660565 + @staticmethod + def snake_case(camel_cased): + """Transform to snake case + Transforms the keys of the given map from camelCase to snake_case. + """ + return [ + ( + re.compile("([a-z0-9])([A-Z])") + .sub(r"\1_\2", re.compile(r"(.)([A-Z][a-z]+)").sub(r"\1_\2", k)) + .lower(), + v, + ) + for (k, v) in camel_cased.items() + ] + + @dataclass + class Field: + item_id: int + value: str + slug: str + + def __init__(self, **kwargs): + # The REST API returns attributes with camelCase names which we + # replace with snake_case per Python conventions + for k, v in ServerSecret.snake_case(kwargs): + if k == "item_value": + k = "value" + setattr(self, k, v) + + id: int + folder_name: str + folder_path: str + parent_folder_id: int + folder_type_id: int + secret_policy_id: int + inherit_secret_policy: bool + inherit_permissions: bool + child_folders: list + secret_templates: list + + def __init__(self, **kwargs): + # The REST API returns attributes with camelCase names which we replace + # with snake_case per Python conventions + for k, v in self.snake_case(kwargs): + setattr(self, k, v) + + class SecretServerError(Exception): """An Exception that includes a message and the server response""" def __init__(self, message, response=None, *args, **kwargs): self.message = message super().__init__(*args, **kwargs) - - class SecretServerClientError(SecretServerError): """An Exception that represents a client error i.e. ``400``.""" - - class SecretServerServiceError(SecretServerError): """An Exception that represents a service error i.e. ``500``.""" - - class Authorizer(ABC): """Main abstract base class for all Authorizer access methods.""" - @staticmethod def add_bearer_token_authorization_header(bearer_token, existing_headers={}): """Adds an HTTP `Authorization` header containing the `Bearer` token - :param existing_headers: a ``dict`` containing the existing headers :return: a ``dict`` containing the `existing_headers` and the `Authorization` header :rtype: ``dict`` """ - return { "Authorization": "Bearer " + bearer_token, **existing_headers, } - @abstractmethod def get_access_token(self): """Returns the access_token from a Grant Request""" - def headers(self, existing_headers={}): """Returns a dictionary containing headers for REST API calls""" return self.add_bearer_token_authorization_header( self.get_access_token(), existing_headers ) - - class AccessTokenAuthorizer(Authorizer): """Allows the use of a pre-existing access token to authorize REST API calls. """ - def get_access_token(self): return self.access_token - def __init__(self, access_token): self.access_token = access_token - - class PasswordGrantAuthorizer(Authorizer): """Allows the use of a username and password to be used to authorize REST API calls. """ - TOKEN_PATH_URI = "/oauth2/token" - @staticmethod def get_access_grant(token_url, grant_request): """Gets an *OAuth2 Access Grant* by calling the Secret Server REST API ``token`` endpoint - :raise :class:`SecretServerError` when the server returns anything other than a valid Access Grant """ - response = requests.post(token_url, grant_request) - try: # TSS returns a 200 (OK) containing HTML for some error conditions return json.loads(SecretServer.process(response).content) except json.JSONDecodeError: raise SecretServerError(response) - def _refresh(self, seconds_of_drift=300): """Refreshes the *OAuth2 Access Grant* if it has expired or will in the next `seconds_of_drift` seconds. - :raise :class:`SecretServerError` when the server returns anything other than a valid Access Grant """ - if ( hasattr(self, "access_grant") and self.access_grant_refreshed @@ -194,7 +207,6 @@ def _refresh(self, seconds_of_drift=300): self.token_url, self.grant_request ) self.access_grant_refreshed = datetime.now() - def __init__(self, base_url, username, password, token_path_uri=TOKEN_PATH_URI): self.token_url = base_url.rstrip("/") + "/" + token_path_uri.strip("/") self.grant_request = { @@ -202,15 +214,11 @@ def __init__(self, base_url, username, password, token_path_uri=TOKEN_PATH_URI): "password": password, "grant_type": "password", } - def get_access_token(self): self._refresh() return self.access_grant["access_token"] - - class DomainPasswordGrantAuthorizer(PasswordGrantAuthorizer): """Allows domain access to be used to authorize REST API calls.""" - def __init__( self, base_url, @@ -221,23 +229,17 @@ def __init__( ): super().__init__(base_url, username, password, token_path_uri=token_path_uri) self.grant_request["domain"] = domain - - class SecretServer: """A class that uses an *OAuth2 Bearer Token* to access the Secret Server REST API. It uses the and `Authorizer` to determine the Authorization method required to access the Secret Server at :attr:`base_url`. - It gets an ``access_token`` that it uses to create an *HTTP Authorization Header* which it includes in each REST API call. """ - API_PATH_URI = "/api/v1" - @staticmethod def process(response): """Process the response raising an error if the call was unsuccessful - :return: the response if the call was successful :rtype: :class:`~requests.Response` :raises: :class:`SecretServerAccessError` when the caller does not have @@ -245,7 +247,6 @@ def process(response): :raises: :class:`SecretsAccessError` when the server responses with any other error """ - if response.status_code >= 200 and response.status_code < 300: return response if response.status_code >= 400 and response.status_code < 500: @@ -260,11 +261,9 @@ def process(response): raise SecretServerClientError(message, response) else: raise SecretServerServiceError(response) - def headers(self): """Returns a dictionary containing HTTP headers.""" return self.authorizer.headers() - def __init__( self, base_url, @@ -279,14 +278,11 @@ def __init__( :param api_path_uri: Defaults to ``/api/v1`` :type api_path_uri: str """ - self.base_url = base_url.rstrip("/") self.authorizer = authorizer self.api_url = f"{self.base_url}/{api_path_uri.strip('/')}" - def get_secret_json(self, id, query_params=None): """Gets a Secret from Secret Server - :param id: the id of the secret :type id: int :param query_params: query parameters to pass to the endpoint @@ -299,6 +295,34 @@ def get_secret_json(self, id, query_params=None): any other reason """ endpoint_url = f"{self.api_url}/secrets/{id}" + if query_params is None: + return self.process(requests.get(endpoint_url, headers=self.headers())).text + else: + return self.process( + requests.get( + endpoint_url, + params=query_params, + headers=self.headers(), + ) + ).text + + def get_folder_json(self, id, query_params=None, get_all_children=True): + """Gets a Folder from Secret Server + :param id: the id of the folder + :type id: int + :param query_params: query parameters to pass to the endpoint + :type query_params: dict + :return: a JSON formatted string representation of the folder + :rtype: ``str`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the folder + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + endpoint_url = f"{self.api_url}/folders/{id}" + + if get_all_children: + query_params["getAllChildren"] = "true" if query_params is None: return self.process(requests.get(endpoint_url, headers=self.headers())).text @@ -313,7 +337,6 @@ def get_secret_json(self, id, query_params=None): def get_secret(self, id, fetch_file_attachments=True, query_params=None): """Gets a secret - :param id: the id of the secret :type id: int :param fetch_file_attachments: whether or not to fetch file attachments @@ -329,14 +352,11 @@ def get_secret(self, id, fetch_file_attachments=True, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - response = self.get_secret_json(id, query_params=query_params) - try: secret = json.loads(response) except json.JSONDecodeError: raise SecretServerError(response) - if fetch_file_attachments: for item in secret["items"]: if item["fileAttachmentId"]: @@ -355,9 +375,35 @@ def get_secret(self, id, fetch_file_attachments=True, query_params=None): ) return secret + def get_folder(self, id, query_params=None, get_all_children=False): + """Gets a folder + :param id: the id of the folder + :type id: int + :param getAllChildren: Whether to retrieve all child folders of the requested folder + :type fetch_file_attachments: bool + :param query_params: query parameters to pass to the endpoint + :type query_params: dict + :return: a ``dict`` representation of the folder + :rtype: ``dict`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the folder + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + + response = self.get_folder_json( + id, query_params=query_params, get_all_children=get_all_children + ) + + try: + folder = json.loads(response) + except json.JSONDecodeError: + raise SecretServerError(response) + + return folder + def get_secret_by_path(self, secret_path, fetch_file_attachments=True): """Gets a secret by path - :param secret_path: full path of the secret :type secret_path: str :param fetch_file_attachments: whether or not to fetch file attachments @@ -368,7 +414,6 @@ def get_secret_by_path(self, secret_path, fetch_file_attachments=True): :rtype: ``dict`` """ path = "\\" + re.sub(r"[\\/]+", r"\\", secret_path).lstrip("\\").rstrip("\\") - params = {"secretPath": path} return self.get_secret( id=0, @@ -376,9 +421,24 @@ def get_secret_by_path(self, secret_path, fetch_file_attachments=True): query_params=params, ) + def get_folder_by_path(self, folder_path, get_all_children=True): + """Gets a folder by path + :param folder_path: full path of the folder + :type folder_path: str + :return: a ``dict`` representation of the folder + :rtype: ``dict`` + """ + path = "\\" + re.sub(r"[\\/]+", r"\\", folder_path).lstrip("\\").rstrip("\\") + + params = {"folderPath": path} + return self.get_folder( + id=0, + get_all_children=get_all_children, + query_params=params, + ) + def search_secrets(self, query_params=None): """Get Secrets from Secret Server - :param query_params: query parameters to pass to the endpoint :type query_params: dict :return: a JSON formatted string representation of the secrets @@ -389,6 +449,29 @@ def search_secrets(self, query_params=None): any other reason """ endpoint_url = f"{self.api_url}/secrets" + if query_params is None: + return self.process(requests.get(endpoint_url, headers=self.headers())).text + else: + return self.process( + requests.get( + endpoint_url, + params=query_params, + headers=self.headers(), + ) + ).text + + def lookup_folders(self, query_params=None): + """Lookup Folders from Secret Server + :param query_params: query parameters to pass to the endpoint + :type query_params: dict + :return: a JSON formatted string representation of the folders, containing only id and name + :rtype: ``str`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the secret + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + endpoint_url = f"{self.api_url}/folders/lookup" if query_params is None: return self.process(requests.get(endpoint_url, headers=self.headers())).text @@ -403,7 +486,6 @@ def search_secrets(self, query_params=None): def get_secret_ids_by_folderid(self, folder_id): """Gets a list of secrets ids by folder_id - :param folder_id: the id of the folder :type id: int :return: a ``list`` of the secret id's @@ -413,37 +495,70 @@ def get_secret_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ - params = {"filter.folderId": folder_id} endpoint_url = f"{self.api_url}/secrets/search-total" params["take"] = self.process( requests.get(endpoint_url, params=params, headers=self.headers()) ).text response = self.search_secrets(query_params=params) - try: secrets = json.loads(response) except json.JSONDecodeError: raise SecretServerError(response) - secret_ids = [] for secret in secrets["records"]: secret_ids.append(secret["id"]) return secret_ids + def get_child_folder_ids_by_folderid(self, folder_id): + """Gets a list of child folder ids by folder_id + :param folder_id: the id of the folder + :type id: int + :return: a ``list`` of the child folder id's + :rtype: ``list`` + :raise: :class:`SecretServerAccessError` when the caller does not have + permission to access the secret + :raise: :class:`SecretServerError` when the REST API call fails for + any other reason + """ + + params = { + "filter.parentFolderId": folder_id, + "filter.limitToDirectDescendents": True, + } + params["take"] = 1 + endpoint_url = f"{self.api_url}/folders/lookup" + + params["take"] = self.process( + requests.get(endpoint_url, params=params, headers=self.headers()) + ).json()["total"] + # Handle result of zero child folders + if params["take"] != 0: + response = self.lookup_folders(query_params=params) + + try: + response = json.loads(response) + except json.JSONDecodeError: + raise SecretServerError(response) + + child_folder_ids = [] + for childFolder in response["records"]: + child_folder_ids.append(childFolder["id"]) + + return child_folder_ids + else: + return [] + class SecretServerV0(SecretServer): """A class that uses an *OAuth2 Bearer Token* to access the Secret Server REST API. It uses the :attr:`username` and :attr:`password` to access the Secret Server at :attr:`base_url`. - It gets an ``access_token`` that it uses to create an *HTTP Authorization Header* which it includes in each REST API call. - This class maintains backwards compatibility with v0.0.5 """ - def __init__( self, base_url, @@ -459,21 +574,15 @@ def __init__( ), api_path_uri, ) - - class SecretServerCloud(SecretServer): """A class that uses bearer token authentication to access the Secret Server Cloud REST API. - It uses :attr:`tenant`, :attr:`tld` with :attr:`SERVER_URL_TEMPLATE`, to create request URLs. - It uses the :attr:`username` and :attr:`password` to get an access_token from Secret Server Cloud which it uses to make calls to the REST API. """ - DEFAULT_TLD = "com" URL_TEMPLATE = "https://{}.secretservercloud.{}" - def __init__(self, tenant, authorizer: Authorizer, tld=DEFAULT_TLD): super().__init__(self.URL_TEMPLATE.format(tenant, tld), authorizer)