From c5ca9934fda8602aea5e045117537ab25026d97e Mon Sep 17 00:00:00 2001 From: delinea-rajani <164006534+delinea-rajani@users.noreply.github.com> Date: Mon, 1 Jul 2024 13:21:13 -0500 Subject: [PATCH] Update server.py --- delinea/secrets/server.py | 87 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 92b3a11..4d50a19 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -1,28 +1,35 @@ -ile line number Diff line number Diff line change """The Delinea Secret Server SDK API facilitates access to the Secret Server REST API using *OAuth2 Bearer Token* authentication. + Example: + # connect to Secret Server secret_server = SecretServer(base_url, authorizer, api_path_uri='/api/v1') # or, for Secret Server Cloud secret_server = SecretServerCloud(tenant, authorizer, tld='com') + # to get the secret as a ``dict`` secret = secret_server.get_secret(123) # or, to use the dataclass secret = ServerSecret(**secret_server.get_secret(123)) """ + import json import re from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timedelta + import requests + + @dataclass class ServerSecret: # Based on https://gist.github.com/jaytaylor/3660565 @staticmethod def snake_case(camel_cased): """Transform to snake case + Transforms the keys of the given map from camelCase to snake_case. """ return [ @@ -34,6 +41,7 @@ def snake_case(camel_cased): ) for (k, v) in camel_cased.items() ] + @dataclass class Field: item_id: int @@ -44,6 +52,7 @@ class Field: filename: str value: str slug: str + def __init__(self, **kwargs): # The REST API returns attributes with camelCase names which we # replace with snake_case per Python conventions @@ -51,6 +60,7 @@ def __init__(self, **kwargs): if k == "item_value": k = "value" setattr(self, k, v) + id: int folder_id: int secret_template_id: int @@ -64,7 +74,9 @@ def __init__(self, **kwargs): last_heart_beat_check: datetime last_password_change_attempt: datetime fields: dict + DEFAULT_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S" + def __init__(self, **kwargs): # The REST API returns attributes with camelCase names which we replace # with snake_case per Python conventions @@ -88,6 +100,7 @@ class ServerFolder: @staticmethod def snake_case(camel_cased): """Transform to snake case + Transforms the keys of the given map from camelCase to snake_case. """ return [ @@ -138,63 +151,88 @@ class SecretServerError(Exception): def __init__(self, message, response=None, *args, **kwargs): self.message = message super().__init__(*args, **kwargs) + + class SecretServerClientError(SecretServerError): """An Exception that represents a client error i.e. ``400``.""" + + class SecretServerServiceError(SecretServerError): """An Exception that represents a service error i.e. ``500``.""" + + class Authorizer(ABC): """Main abstract base class for all Authorizer access methods.""" + @staticmethod def add_bearer_token_authorization_header(bearer_token, existing_headers={}): """Adds an HTTP `Authorization` header containing the `Bearer` token + :param existing_headers: a ``dict`` containing the existing headers :return: a ``dict`` containing the `existing_headers` and the `Authorization` header :rtype: ``dict`` """ + return { "Authorization": "Bearer " + bearer_token, **existing_headers, } + @abstractmethod def get_access_token(self): """Returns the access_token from a Grant Request""" + def headers(self, existing_headers={}): """Returns a dictionary containing headers for REST API calls""" return self.add_bearer_token_authorization_header( self.get_access_token(), existing_headers ) + + class AccessTokenAuthorizer(Authorizer): """Allows the use of a pre-existing access token to authorize REST API calls. """ + def get_access_token(self): return self.access_token + def __init__(self, access_token): self.access_token = access_token + + class PasswordGrantAuthorizer(Authorizer): """Allows the use of a username and password to be used to authorize REST API calls. """ + TOKEN_PATH_URI = "/oauth2/token" + @staticmethod def get_access_grant(token_url, grant_request): """Gets an *OAuth2 Access Grant* by calling the Secret Server REST API ``token`` endpoint + :raise :class:`SecretServerError` when the server returns anything other than a valid Access Grant """ + response = requests.post(token_url, grant_request) + try: # TSS returns a 200 (OK) containing HTML for some error conditions return json.loads(SecretServer.process(response).content) except json.JSONDecodeError: raise SecretServerError(response) + def _refresh(self, seconds_of_drift=300): """Refreshes the *OAuth2 Access Grant* if it has expired or will in the next `seconds_of_drift` seconds. + :raise :class:`SecretServerError` when the server returns anything other than a valid Access Grant """ + if ( hasattr(self, "access_grant") and self.access_grant_refreshed @@ -207,6 +245,7 @@ def _refresh(self, seconds_of_drift=300): self.token_url, self.grant_request ) self.access_grant_refreshed = datetime.now() + def __init__(self, base_url, username, password, token_path_uri=TOKEN_PATH_URI): self.token_url = base_url.rstrip("/") + "/" + token_path_uri.strip("/") self.grant_request = { @@ -214,11 +253,15 @@ def __init__(self, base_url, username, password, token_path_uri=TOKEN_PATH_URI): "password": password, "grant_type": "password", } + def get_access_token(self): self._refresh() return self.access_grant["access_token"] + + class DomainPasswordGrantAuthorizer(PasswordGrantAuthorizer): """Allows domain access to be used to authorize REST API calls.""" + def __init__( self, base_url, @@ -229,17 +272,23 @@ def __init__( ): super().__init__(base_url, username, password, token_path_uri=token_path_uri) self.grant_request["domain"] = domain + + class SecretServer: """A class that uses an *OAuth2 Bearer Token* to access the Secret Server REST API. It uses the and `Authorizer` to determine the Authorization method required to access the Secret Server at :attr:`base_url`. + It gets an ``access_token`` that it uses to create an *HTTP Authorization Header* which it includes in each REST API call. """ + API_PATH_URI = "/api/v1" + @staticmethod def process(response): """Process the response raising an error if the call was unsuccessful + :return: the response if the call was successful :rtype: :class:`~requests.Response` :raises: :class:`SecretServerAccessError` when the caller does not have @@ -247,6 +296,7 @@ def process(response): :raises: :class:`SecretsAccessError` when the server responses with any other error """ + if response.status_code >= 200 and response.status_code < 300: return response if response.status_code >= 400 and response.status_code < 500: @@ -261,9 +311,11 @@ def process(response): raise SecretServerClientError(message, response) else: raise SecretServerServiceError(response) + def headers(self): """Returns a dictionary containing HTTP headers.""" return self.authorizer.headers() + def __init__( self, base_url, @@ -278,11 +330,14 @@ def __init__( :param api_path_uri: Defaults to ``/api/v1`` :type api_path_uri: str """ + self.base_url = base_url.rstrip("/") self.authorizer = authorizer self.api_url = f"{self.base_url}/{api_path_uri.strip('/')}" + def get_secret_json(self, id, query_params=None): """Gets a Secret from Secret Server + :param id: the id of the secret :type id: int :param query_params: query parameters to pass to the endpoint @@ -295,6 +350,7 @@ def get_secret_json(self, id, query_params=None): any other reason """ endpoint_url = f"{self.api_url}/secrets/{id}" + if query_params is None: return self.process(requests.get(endpoint_url, headers=self.headers())).text else: @@ -308,6 +364,7 @@ def get_secret_json(self, id, query_params=None): def get_folder_json(self, id, query_params=None, get_all_children=True): """Gets a Folder from Secret Server + :param id: the id of the folder :type id: int :param query_params: query parameters to pass to the endpoint @@ -337,6 +394,7 @@ def get_folder_json(self, id, query_params=None, get_all_children=True): def get_secret(self, id, fetch_file_attachments=True, query_params=None): """Gets a secret + :param id: the id of the secret :type id: int :param fetch_file_attachments: whether or not to fetch file attachments @@ -352,11 +410,14 @@ def get_secret(self, id, fetch_file_attachments=True, query_params=None): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + response = self.get_secret_json(id, query_params=query_params) + try: secret = json.loads(response) except json.JSONDecodeError: raise SecretServerError(response) + if fetch_file_attachments: for item in secret["items"]: if item["fileAttachmentId"]: @@ -377,6 +438,7 @@ def get_secret(self, id, fetch_file_attachments=True, query_params=None): def get_folder(self, id, query_params=None, get_all_children=False): """Gets a folder + :param id: the id of the folder :type id: int :param getAllChildren: Whether to retrieve all child folders of the requested folder @@ -404,6 +466,7 @@ def get_folder(self, id, query_params=None, get_all_children=False): def get_secret_by_path(self, secret_path, fetch_file_attachments=True): """Gets a secret by path + :param secret_path: full path of the secret :type secret_path: str :param fetch_file_attachments: whether or not to fetch file attachments @@ -414,6 +477,7 @@ def get_secret_by_path(self, secret_path, fetch_file_attachments=True): :rtype: ``dict`` """ path = "\\" + re.sub(r"[\\/]+", r"\\", secret_path).lstrip("\\").rstrip("\\") + params = {"secretPath": path} return self.get_secret( id=0, @@ -423,11 +487,13 @@ def get_secret_by_path(self, secret_path, fetch_file_attachments=True): def get_folder_by_path(self, folder_path, get_all_children=True): """Gets a folder by path + :param folder_path: full path of the folder :type folder_path: str :return: a ``dict`` representation of the folder :rtype: ``dict`` """ + # print(f"========================================"+folder_path) path = "\\" + re.sub(r"[\\/]+", r"\\", folder_path).lstrip("\\").rstrip("\\") params = {"folderPath": path} @@ -439,6 +505,7 @@ def get_folder_by_path(self, folder_path, get_all_children=True): def search_secrets(self, query_params=None): """Get Secrets from Secret Server + :param query_params: query parameters to pass to the endpoint :type query_params: dict :return: a JSON formatted string representation of the secrets @@ -449,6 +516,7 @@ def search_secrets(self, query_params=None): any other reason """ endpoint_url = f"{self.api_url}/secrets" + if query_params is None: return self.process(requests.get(endpoint_url, headers=self.headers())).text else: @@ -462,6 +530,7 @@ def search_secrets(self, query_params=None): def lookup_folders(self, query_params=None): """Lookup Folders from Secret Server + :param query_params: query parameters to pass to the endpoint :type query_params: dict :return: a JSON formatted string representation of the folders, containing only id and name @@ -486,6 +555,7 @@ def lookup_folders(self, query_params=None): def get_secret_ids_by_folderid(self, folder_id): """Gets a list of secrets ids by folder_id + :param folder_id: the id of the folder :type id: int :return: a ``list`` of the secret id's @@ -495,16 +565,19 @@ def get_secret_ids_by_folderid(self, folder_id): :raise: :class:`SecretServerError` when the REST API call fails for any other reason """ + params = {"filter.folderId": folder_id} endpoint_url = f"{self.api_url}/secrets/search-total" params["take"] = self.process( requests.get(endpoint_url, params=params, headers=self.headers()) ).text response = self.search_secrets(query_params=params) + try: secrets = json.loads(response) except json.JSONDecodeError: raise SecretServerError(response) + secret_ids = [] for secret in secrets["records"]: secret_ids.append(secret["id"]) @@ -555,10 +628,13 @@ class SecretServerV0(SecretServer): """A class that uses an *OAuth2 Bearer Token* to access the Secret Server REST API. It uses the :attr:`username` and :attr:`password` to access the Secret Server at :attr:`base_url`. + It gets an ``access_token`` that it uses to create an *HTTP Authorization Header* which it includes in each REST API call. + This class maintains backwards compatibility with v0.0.5 """ + def __init__( self, base_url, @@ -574,15 +650,24 @@ def __init__( ), api_path_uri, ) + + class SecretServerCloud(SecretServer): """A class that uses bearer token authentication to access the Secret Server Cloud REST API. + It uses :attr:`tenant`, :attr:`tld` with :attr:`SERVER_URL_TEMPLATE`, to create request URLs. + It uses the :attr:`username` and :attr:`password` to get an access_token from Secret Server Cloud which it uses to make calls to the REST API. """ + DEFAULT_TLD = "com" URL_TEMPLATE = "https://{}.secretservercloud.{}" + def __init__(self, tenant, authorizer: Authorizer, tld=DEFAULT_TLD): super().__init__(self.URL_TEMPLATE.format(tenant, tld), authorizer) + + +