From 4ed3f3cc6033def8296c246dc03cc54923a3cbb7 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Mon, 18 Nov 2024 13:19:45 +0400 Subject: [PATCH] provider: appdynamics provider now supports both username/password and access_token authentication methods --- .../appdynamics_provider.py | 80 ++++++++++++++----- 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/keep/providers/appdynamics_provider/appdynamics_provider.py b/keep/providers/appdynamics_provider/appdynamics_provider.py index 020f1badf..51466700c 100644 --- a/keep/providers/appdynamics_provider/appdynamics_provider.py +++ b/keep/providers/appdynamics_provider/appdynamics_provider.py @@ -28,14 +28,6 @@ class AppdynamicsProviderAuthConfig: """ AppDynamics authentication configuration. """ - - appDynamicsAccessToken: str = dataclasses.field( - metadata={ - "required": True, - "description": "AppDynamics Access Token", - "hint": "Access Token", - }, - ) appDynamicsAccountName: str = dataclasses.field( metadata={ "required": True, @@ -43,14 +35,7 @@ class AppdynamicsProviderAuthConfig: "hint": "AppDynamics Account Name", }, ) - # appDynamicsPassword: str = dataclasses.field( - # metadata={ - # "required": True, - # "description": "Password", - # "hint": "Password associated with your account", - # "sensitive": True, - # }, - # ) + appId: str = dataclasses.field( metadata={ "required": True, @@ -66,6 +51,47 @@ class AppdynamicsProviderAuthConfig: }, ) + appDynamicsAccessToken: Optional[str] = dataclasses.field( + default=None, + metadata={ + "description": "AppDynamics Access Token", + "hint": "Access Token", + "config_sub_group": "access_token", + "config_main_group": "authentication", + }, + ) + + appDynamicsUsername: Optional[str] = dataclasses.field( + default=None, + metadata={ + "description": "Username", + "hint": "Username associated with your account", + "config_sub_group": "basic_auth", + "config_main_group": "authentication", + }, + ) + appDynamicsPassword: Optional[str] = dataclasses.field( + default=None, + metadata={ + "description": "Password", + "hint": "Password associated with your account", + "sensitive": True, + "config_sub_group": "basic_auth", + "config_main_group": "authentication", + }, + ) + + @pydantic.root_validator + def check_password_or_token(cls, values): + username, password, token = ( + values.get("appDynamicsUsername"), + values.get("appDynamicsPassword"), + values.get("appDynamicsAccessToken") + ) + if not (username and password) and not token: + raise ValueError("Either username/password or access token must be provided") + return values + class AppdynamicsProvider(BaseProvider): """Install Webhooks and receive alerts from AppDynamics.""" @@ -151,6 +177,7 @@ def get_user_id_by_name(self, name: str) -> Optional[str]: response = requests.get( url=self.__get_url(paths=["controller/api/rbac/v1/users/"]), headers=self.__get_headers(), + auth=self.__get_auth(), ) if response.ok: users = response.json() @@ -180,6 +207,7 @@ def validate_scopes(self) -> dict[str, bool | str]: response = requests.get( url=url, headers=self.__get_headers(), + auth=self.__get_auth(), ) if response.ok: authenticated = True @@ -202,9 +230,18 @@ def validate_scopes(self) -> dict[str, bool | str]: return {"authenticated": authenticated, "administrator": administrator} def __get_headers(self): - return { - "Authorization": f"Bearer {self.authentication_config.appDynamicsAccessToken}", - } + if self.authentication_config.appDynamicsAccessToken: + return { + "Authorization": f"Bearer {self.authentication_config.appDynamicsAccessToken}", + } + + def __get_auth(self) -> tuple[str, str]: + if self.authentication_config.appDynamicsUsername and self.authentication_config.appDynamicsPassword: + return ( + f"{self.authentication_config.appDynamicsUsername}@{self.authentication_config.appDynamicsAccountName}", + self.authentication_config.appDynamicsPassword, + ) + def __create_http_response_template(self, keep_api_url: str, api_key: str): keep_api_host, keep_api_path = keep_api_url.rsplit("/", 1) @@ -226,6 +263,7 @@ def __create_http_response_template(self, keep_api_url: str, api_key: str): self.__get_url(paths=["controller/actiontemplate/httprequest"]), files={"template": temp}, headers=self.__get_headers(), + auth=self.__get_auth(), ) res = res.json() temp.close() @@ -251,6 +289,7 @@ def __create_action(self): ] ), headers=self.__get_headers(), + auth=self.__get_auth(), json={ "actionType": "HTTP_REQUEST", "name": "KeepAction", @@ -295,6 +334,7 @@ def setup_webhook( ] ), headers=self.__get_headers(), + auth=self.__get_auth(), ) policies = policies_response.json() @@ -313,6 +353,7 @@ def setup_webhook( ] ), headers=self.__get_headers(), + auth=self.__get_auth(), ).json() if policy_config not in curr_policy["actions"]: curr_policy["actions"].append(policy_config) @@ -336,6 +377,7 @@ def setup_webhook( ] ), headers=self.__get_headers(), + auth=self.__get_auth(), json=curr_policy, ) if not request.ok: