diff --git a/docs/providers/documentation/appdynamics-provider.mdx b/docs/providers/documentation/appdynamics-provider.mdx index eef0e7a76..8948964be 100644 --- a/docs/providers/documentation/appdynamics-provider.mdx +++ b/docs/providers/documentation/appdynamics-provider.mdx @@ -7,19 +7,26 @@ description: "AppDynamics provider allows you to get AppDynamics `alerts/actions ## Authentication Parameters The AppDynamics provider requires the following authentication parameter: -- `AppDynamics Username`: Required. This is your AppDynamics account username. -- `AppDynamics Password`: This is the password associated with your AppDynamics Username. +- `AppDynamics Access Token`: Required if username/password is not provided for Bearer token authentication. +- `AppDynamics Username`: Required for Basic Auth authentication. This is your AppDynamics account username. +- `AppDynamics Password`: Required for Basic Auth authentication. This is the password associated with your AppDynamics Username. - `AppDynamics Account Name`: This is your account's name. - `App Id`: The Id of the Application in which you would like to install the webhook. - `Host`: This is the hostname of the AppDynamics instance you wish to connect to. It identifies the AppDynamics server that the API will interact with. ## Connecting with the Provider +1. Ensure you have a AppDynamics account with the necessary [permissions](https://docs.appdynamics.com/accounts/en/cisco-appdynamics-on-premises-user-management/roles-and-permissions). The basic permissions required are `Account Owner` or `Administrator`. Alternatively you can create an account (instructions)[https://docs.appdynamics.com/accounts/en/global-account-administration/access-management/manage-user-accounts] + +### Basic Auth authentication Obtain AppDynamics Username and Password: -1. Ensure you have a AppDynamics account with the necessary [permissions](https://docs.appdynamics.com/accounts/en/cisco-appdynamics-on-premises-user-management/roles-and-permissions). The basic permissions required are `Account Owner` or `Administrator`. Alternatively you can create an account (instructions)[https://docs.appdynamics.com/accounts/en/global-account-administration/access-management/manage-user-accounts] -2. Find your account name [here](https://accounts.appdynamics.com/overview). -3. Determine the Host [here](https://accounts.appdynamics.com/overview). -4. Get the appId of the Appdynamics instance in which you wish to install the webhook into. +1. Find your account name [here](https://accounts.appdynamics.com/overview). + +OR create Access Token: +1. Follow instructions [here](https://docs.appdynamics.com/appd/23.x/latest/en/extend-appdynamics/appdynamics-apis/api-clients) + +1. Determine the Host [here](https://accounts.appdynamics.com/overview). +2. Get the appId of the Appdynamics instance in which you wish to install the webhook into. ## Webhook Integration Modifications diff --git a/keep/providers/appdynamics_provider/appdynamics_provider.py b/keep/providers/appdynamics_provider/appdynamics_provider.py index d0f5e8867..84205eb83 100644 --- a/keep/providers/appdynamics_provider/appdynamics_provider.py +++ b/keep/providers/appdynamics_provider/appdynamics_provider.py @@ -6,11 +6,12 @@ import json import tempfile from pathlib import Path -from typing import List +from typing import List, Optional from urllib.parse import urlencode, urljoin import pydantic import requests +from dateutil import parser from keep.api.models.alert import AlertDto, AlertSeverity from keep.contextmanager.contextmanager import ContextManager @@ -28,14 +29,6 @@ class AppdynamicsProviderAuthConfig: """ AppDynamics authentication configuration. """ - - appDynamicsUsername: str = dataclasses.field( - metadata={ - "required": True, - "description": "AppDynamics Username", - "hint": "Your Username", - }, - ) appDynamicsAccountName: str = dataclasses.field( metadata={ "required": True, @@ -43,14 +36,7 @@ class AppdynamicsProviderAuthConfig: "hint": "AppDynamics Account Name", }, ) - appDynamicsPassword: str = dataclasses.field( - metadata={ - "required": True, - "description": "Password", - "hint": "Password associated with your account", - "sensitive": True, - }, - ) + appId: str = dataclasses.field( metadata={ "required": True, @@ -66,6 +52,47 @@ class AppdynamicsProviderAuthConfig: }, ) + appDynamicsAccessToken: Optional[str] = dataclasses.field( + default=None, + metadata={ + "description": "AppDynamics Access Token", + "hint": "Access Token", + "config_sub_group": "access_token", + "config_main_group": "authentication", + }, + ) + + appDynamicsUsername: Optional[str] = dataclasses.field( + default=None, + metadata={ + "description": "Username", + "hint": "Username associated with your account", + "config_sub_group": "basic_auth", + "config_main_group": "authentication", + }, + ) + appDynamicsPassword: Optional[str] = dataclasses.field( + default=None, + metadata={ + "description": "Password", + "hint": "Password associated with your account", + "sensitive": True, + "config_sub_group": "basic_auth", + "config_main_group": "authentication", + }, + ) + + @pydantic.root_validator + def check_password_or_token(cls, values): + username, password, token = ( + values.get("appDynamicsUsername"), + values.get("appDynamicsPassword"), + values.get("appDynamicsAccessToken") + ) + if not (username and password) and not token: + raise ValueError("Either username/password or access token must be provided") + return values + class AppdynamicsProvider(BaseProvider): """Install Webhooks and receive alerts from AppDynamics.""" @@ -121,7 +148,7 @@ def validate_config(self): f"https://{self.authentication_config.host}" ) - def __get_url(self, paths: List[str] = [], query_params: dict = None, **kwargs): + def __get_url(self, paths: List[str] = None, query_params: dict = None, **kwargs): """ Helper method to build the url for AppDynamics api requests. @@ -133,6 +160,7 @@ def __get_url(self, paths: List[str] = [], query_params: dict = None, **kwargs): # url = https://baseballxyz.saas.appdynamics.com/rest/api/2/issue/createmeta?projectKeys=key1 """ + paths = paths or [] url = urljoin( f"{self.authentication_config.host}/controller", @@ -145,17 +173,41 @@ def __get_url(self, paths: List[str] = [], query_params: dict = None, **kwargs): return url + def get_user_id_by_name(self, name: str) -> Optional[str]: + self.logger.info("Getting user ID by name") + response = requests.get( + url=self.__get_url(paths=["controller/api/rbac/v1/users/"]), + headers=self.__get_headers(), + auth=self.__get_auth(), + ) + if response.ok: + users = response.json() + for user in users["users"]: + if user["name"].lower() == name.lower(): + return user["id"] + return None + else: + self.logger.error( + "Error while validating scopes for AppDynamics", extra=response.json() + ) + def validate_scopes(self) -> dict[str, bool | str]: authenticated = False administrator = "Missing Administrator Privileges" self.logger.info("Validating AppDynamics Scopes") + + user_id = self.get_user_id_by_name(self.authentication_config.appDynamicsAccountName) + + url = self.__get_url( + paths=[ + "controller/api/rbac/v1/users/", + user_id, + ] + ) + response = requests.get( - url=self.__get_url( - paths=[ - "controller/api/rbac/v1/users/name", - self.authentication_config.appDynamicsUsername, - ] - ), + url=url, + headers=self.__get_headers(), auth=self.__get_auth(), ) if response.ok: @@ -178,11 +230,19 @@ def validate_scopes(self) -> dict[str, bool | str]: return {"authenticated": authenticated, "administrator": administrator} + def __get_headers(self): + if self.authentication_config.appDynamicsAccessToken: + return { + "Authorization": f"Bearer {self.authentication_config.appDynamicsAccessToken}", + } + def __get_auth(self) -> tuple[str, str]: - return ( - f"{self.authentication_config.appDynamicsUsername}@{self.authentication_config.appDynamicsAccountName}", - self.authentication_config.appDynamicsPassword, - ) + if self.authentication_config.appDynamicsUsername and self.authentication_config.appDynamicsPassword: + return ( + f"{self.authentication_config.appDynamicsUsername}@{self.authentication_config.appDynamicsAccountName}", + self.authentication_config.appDynamicsPassword, + ) + def __create_http_response_template(self, keep_api_url: str, api_key: str): keep_api_host, keep_api_path = keep_api_url.rsplit("/", 1) @@ -203,11 +263,12 @@ def __create_http_response_template(self, keep_api_url: str, api_key: str): res = requests.post( self.__get_url(paths=["controller/actiontemplate/httprequest"]), files={"template": temp}, + headers=self.__get_headers(), auth=self.__get_auth(), ) res = res.json() temp.close() - if res["success"] == "True": + if res["success"] == "True" or res["success"] is True: self.logger.info("HTTP Response template Successfully Created") else: self.logger.info("HTTP Response template creation failed", extra=res) @@ -228,6 +289,7 @@ def __create_action(self): "actions", ] ), + headers=self.__get_headers(), auth=self.__get_auth(), json={ "actionType": "HTTP_REQUEST", @@ -272,6 +334,7 @@ def setup_webhook( "policies", ] ), + headers=self.__get_headers(), auth=self.__get_auth(), ) @@ -290,6 +353,7 @@ def setup_webhook( policy["id"], ] ), + headers=self.__get_headers(), auth=self.__get_auth(), ).json() if policy_config not in curr_policy["actions"]: @@ -313,6 +377,7 @@ def setup_webhook( policy["id"], ] ), + headers=self.__get_headers(), auth=self.__get_auth(), json=curr_policy, ) @@ -329,10 +394,16 @@ def _format_alert( id=event["id"], name=event["name"], severity=AppdynamicsProvider.SEVERITIES_MAP.get(event["severity"]), - lastReceived=event["lastReceived"], + lastReceived=parser.parse(event["lastReceived"]).isoformat(), message=event["message"], description=event["description"], event_id=event["event_id"], url=event["url"], source=["appdynamics"], ) + + @staticmethod + def parse_event_raw_body(raw_body: bytes | dict) -> dict: + if isinstance(raw_body, dict): + return raw_body + return json.loads(raw_body, strict=False)