Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provider appdynamics integration using access_token #2500

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions docs/providers/documentation/appdynamics-provider.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,26 @@ description: "AppDynamics provider allows you to get AppDynamics `alerts/actions
## Authentication Parameters
The AppDynamics provider requires the following authentication parameter:

- `AppDynamics Username`: Required. This is your AppDynamics account username.
- `AppDynamics Password`: This is the password associated with your AppDynamics Username.
- `AppDynamics Access Token`: Required if username/password is not provided for Bearer token authentication.
- `AppDynamics Username`: Required for Basic Auth authentication. This is your AppDynamics account username.
- `AppDynamics Password`: Required for Basic Auth authentication. This is the password associated with your AppDynamics Username.
- `AppDynamics Account Name`: This is your account's name.
- `App Id`: The Id of the Application in which you would like to install the webhook.
- `Host`: This is the hostname of the AppDynamics instance you wish to connect to. It identifies the AppDynamics server that the API will interact with.

## Connecting with the Provider
1. Ensure you have a AppDynamics account with the necessary [permissions](https://docs.appdynamics.com/accounts/en/cisco-appdynamics-on-premises-user-management/roles-and-permissions). The basic permissions required are `Account Owner` or `Administrator`. Alternatively you can create an account (instructions)[https://docs.appdynamics.com/accounts/en/global-account-administration/access-management/manage-user-accounts]

### Basic Auth authentication

Obtain AppDynamics Username and Password:
1. Ensure you have a AppDynamics account with the necessary [permissions](https://docs.appdynamics.com/accounts/en/cisco-appdynamics-on-premises-user-management/roles-and-permissions). The basic permissions required are `Account Owner` or `Administrator`. Alternatively you can create an account (instructions)[https://docs.appdynamics.com/accounts/en/global-account-administration/access-management/manage-user-accounts]
2. Find your account name [here](https://accounts.appdynamics.com/overview).
3. Determine the Host [here](https://accounts.appdynamics.com/overview).
4. Get the appId of the Appdynamics instance in which you wish to install the webhook into.
1. Find your account name [here](https://accounts.appdynamics.com/overview).

OR create Access Token:
1. Follow instructions [here](https://docs.appdynamics.com/appd/23.x/latest/en/extend-appdynamics/appdynamics-apis/api-clients)

1. Determine the Host [here](https://accounts.appdynamics.com/overview).
2. Get the appId of the Appdynamics instance in which you wish to install the webhook into.

## Webhook Integration Modifications

Expand Down
131 changes: 101 additions & 30 deletions keep/providers/appdynamics_provider/appdynamics_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import json
import tempfile
from pathlib import Path
from typing import List
from typing import List, Optional
from urllib.parse import urlencode, urljoin

import pydantic
import requests
from dateutil import parser

from keep.api.models.alert import AlertDto, AlertSeverity
from keep.contextmanager.contextmanager import ContextManager
Expand All @@ -28,29 +29,14 @@ class AppdynamicsProviderAuthConfig:
"""
AppDynamics authentication configuration.
"""

appDynamicsUsername: str = dataclasses.field(
metadata={
"required": True,
"description": "AppDynamics Username",
"hint": "Your Username",
},
)
appDynamicsAccountName: str = dataclasses.field(
metadata={
"required": True,
"description": "AppDynamics Account Name",
"hint": "AppDynamics Account Name",
},
)
appDynamicsPassword: str = dataclasses.field(
metadata={
"required": True,
"description": "Password",
"hint": "Password associated with your account",
"sensitive": True,
},
)

appId: str = dataclasses.field(
metadata={
"required": True,
Expand All @@ -66,6 +52,47 @@ class AppdynamicsProviderAuthConfig:
},
)

appDynamicsAccessToken: Optional[str] = dataclasses.field(
default=None,
metadata={
"description": "AppDynamics Access Token",
"hint": "Access Token",
"config_sub_group": "access_token",
"config_main_group": "authentication",
},
)

appDynamicsUsername: Optional[str] = dataclasses.field(
default=None,
metadata={
"description": "Username",
"hint": "Username associated with your account",
"config_sub_group": "basic_auth",
"config_main_group": "authentication",
},
)
appDynamicsPassword: Optional[str] = dataclasses.field(
default=None,
metadata={
"description": "Password",
"hint": "Password associated with your account",
"sensitive": True,
"config_sub_group": "basic_auth",
"config_main_group": "authentication",
},
)

@pydantic.root_validator
def check_password_or_token(cls, values):
username, password, token = (
values.get("appDynamicsUsername"),
values.get("appDynamicsPassword"),
values.get("appDynamicsAccessToken")
)
if not (username and password) and not token:
raise ValueError("Either username/password or access token must be provided")
return values


class AppdynamicsProvider(BaseProvider):
"""Install Webhooks and receive alerts from AppDynamics."""
Expand Down Expand Up @@ -121,7 +148,7 @@ def validate_config(self):
f"https://{self.authentication_config.host}"
)

def __get_url(self, paths: List[str] = [], query_params: dict = None, **kwargs):
def __get_url(self, paths: List[str] = None, query_params: dict = None, **kwargs):
"""
Helper method to build the url for AppDynamics api requests.

Expand All @@ -133,6 +160,7 @@ def __get_url(self, paths: List[str] = [], query_params: dict = None, **kwargs):

# url = https://baseballxyz.saas.appdynamics.com/rest/api/2/issue/createmeta?projectKeys=key1
"""
paths = paths or []

url = urljoin(
f"{self.authentication_config.host}/controller",
Expand All @@ -145,17 +173,41 @@ def __get_url(self, paths: List[str] = [], query_params: dict = None, **kwargs):

return url

def get_user_id_by_name(self, name: str) -> Optional[str]:
self.logger.info("Getting user ID by name")
response = requests.get(
url=self.__get_url(paths=["controller/api/rbac/v1/users/"]),
headers=self.__get_headers(),
auth=self.__get_auth(),
)
if response.ok:
users = response.json()
for user in users["users"]:
if user["name"].lower() == name.lower():
return user["id"]
return None
else:
self.logger.error(
"Error while validating scopes for AppDynamics", extra=response.json()
)

def validate_scopes(self) -> dict[str, bool | str]:
authenticated = False
administrator = "Missing Administrator Privileges"
self.logger.info("Validating AppDynamics Scopes")

user_id = self.get_user_id_by_name(self.authentication_config.appDynamicsAccountName)

url = self.__get_url(
paths=[
"controller/api/rbac/v1/users/",
user_id,
]
)

response = requests.get(
url=self.__get_url(
paths=[
"controller/api/rbac/v1/users/name",
self.authentication_config.appDynamicsUsername,
]
),
url=url,
headers=self.__get_headers(),
auth=self.__get_auth(),
)
if response.ok:
Expand All @@ -178,11 +230,19 @@ def validate_scopes(self) -> dict[str, bool | str]:

return {"authenticated": authenticated, "administrator": administrator}

def __get_headers(self):
if self.authentication_config.appDynamicsAccessToken:
return {
"Authorization": f"Bearer {self.authentication_config.appDynamicsAccessToken}",
}

def __get_auth(self) -> tuple[str, str]:
return (
f"{self.authentication_config.appDynamicsUsername}@{self.authentication_config.appDynamicsAccountName}",
self.authentication_config.appDynamicsPassword,
)
if self.authentication_config.appDynamicsUsername and self.authentication_config.appDynamicsPassword:
return (
f"{self.authentication_config.appDynamicsUsername}@{self.authentication_config.appDynamicsAccountName}",
self.authentication_config.appDynamicsPassword,
)


def __create_http_response_template(self, keep_api_url: str, api_key: str):
keep_api_host, keep_api_path = keep_api_url.rsplit("/", 1)
Expand All @@ -203,11 +263,12 @@ def __create_http_response_template(self, keep_api_url: str, api_key: str):
res = requests.post(
self.__get_url(paths=["controller/actiontemplate/httprequest"]),
files={"template": temp},
headers=self.__get_headers(),
auth=self.__get_auth(),
)
res = res.json()
temp.close()
if res["success"] == "True":
if res["success"] == "True" or res["success"] is True:
self.logger.info("HTTP Response template Successfully Created")
else:
self.logger.info("HTTP Response template creation failed", extra=res)
Expand All @@ -228,6 +289,7 @@ def __create_action(self):
"actions",
]
),
headers=self.__get_headers(),
auth=self.__get_auth(),
json={
"actionType": "HTTP_REQUEST",
Expand Down Expand Up @@ -272,6 +334,7 @@ def setup_webhook(
"policies",
]
),
headers=self.__get_headers(),
auth=self.__get_auth(),
)

Expand All @@ -290,6 +353,7 @@ def setup_webhook(
policy["id"],
]
),
headers=self.__get_headers(),
auth=self.__get_auth(),
).json()
if policy_config not in curr_policy["actions"]:
Expand All @@ -313,6 +377,7 @@ def setup_webhook(
policy["id"],
]
),
headers=self.__get_headers(),
auth=self.__get_auth(),
json=curr_policy,
)
Expand All @@ -329,10 +394,16 @@ def _format_alert(
id=event["id"],
name=event["name"],
severity=AppdynamicsProvider.SEVERITIES_MAP.get(event["severity"]),
lastReceived=event["lastReceived"],
lastReceived=parser.parse(event["lastReceived"]).isoformat(),
message=event["message"],
description=event["description"],
event_id=event["event_id"],
url=event["url"],
source=["appdynamics"],
)

@staticmethod
def parse_event_raw_body(raw_body: bytes | dict) -> dict:
if isinstance(raw_body, dict):
return raw_body
return json.loads(raw_body, strict=False)
Loading