Skip to content

Commit

Permalink
Dnac (#69)
Browse files Browse the repository at this point in the history
* initial commit for dnac support

* added catalystcenter support

* reformat code

* change in endpoints_ise

* remove methods only applicable to ise from cc
  • Loading branch information
kuba-mazurkiewicz authored Feb 3, 2025
1 parent c8b91c5 commit f4f1a3b
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*.json
#/endpoints*.yaml
endpoints*.yaml
!examples/*.json
!examples/endpoints*.yaml
tmp/
Expand Down
125 changes: 125 additions & 0 deletions examples/endpoints_catalystcenter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
- name: transit_network
endpoint: /dna/intent/api/v1/sda/transitNetworks
- name: credentials_snmpv2_write
endpoint: /dna/intent/api/v2/global-credential
# - name: fabric_authentication_profile
# endpoint: /dna/intent/api/v1/business/sda/authentication-profile
# - name: building
# endpoint: /dna/intent/api/v1/site
# - name: fabric_l2_handoff
# endpoint: /dna/intent/api/v1/sda/fabricDevices/layer2Handoffs
# - name: floor
# endpoint: /dna/intent/api/v1/site
# - name: fabric_virtual_network
# endpoint: /dna/intent/api/v1/virtual-network
# - name: role
# endpoint: /dna/system/api/v1/role
# - name: credentials_cli
# endpoint: /dna/intent/api/v2/global-credential
# - name: image_distribution
# endpoint: /dna/intent/api/v1/image/distribution
# - name: discovery
# endpoint: /dna/intent/api/v1/discovery
# - name: wireless_rf_profile
# endpoint: /dna/intent/api/v1/wireless/rf-profile
# - name: area
# endpoint: /dna/intent/api/v1/site
# - name: pnp_device_claim_site
# endpoint: /dna/intent/api/v1/onboarding/pnp-device/site-claim
# - name: credentials_https_write
# endpoint: /dna/intent/api/v2/global-credential
# - name: template_version
# endpoint: /dna/intent/api/v1/template-programmer/template/version
# - name: ip_pool_reservation
# endpoint: /dna/intent/api/v1/reserve-ip-subpool
# - name: ip_pool
# endpoint: /api/v2/ippool
- name: credentials_snmpv2_read
endpoint: /dna/intent/api/v2/global-credential
# - name: pnp_import_devices
# endpoint: /dna/intent/api/v1/onboarding/pnp-device/import
# - name: fabric_device
# endpoint: /dna/intent/api/v1/sda/fabricDevices
# - name: device_role
# endpoint: /dna/intent/api/v1/network-device/brief
# - name: wireless_device_provision
# endpoint: /dna/intent/api/v1/wireless/provision
# - name: device
# endpoint: /dna/intent/api/v1/network-device
# - name: fabric_port_assignment
# endpoint: /dna/intent/api/v1/sda/portAssignments
# - name: sp_profile
# endpoint: /dna/intent/api/v2/service-provider
# - name: network
# endpoint: /dna/intent/api/v2/network
- name: credentials_https_read
endpoint: /dna/intent/api/v2/global-credential
- name: credentials_snmpv3
endpoint: /dna/intent/api/v2/global-credential
# - name: assign_credentials
# endpoint: /dna/intent/api/v2/credential-to-site
# - name: device_detail
# endpoint: /dna/intent/api/v1/device-detail
# - name: virtual_network_ip_pool
# endpoint: /dna/intent/api/v1/business/sda/virtualnetwork/ippool
# - name: fabric_site
# endpoint: /dna/intent/api/v1/sda/fabricSites
# - name: lan_automation
# endpoint: /dna/intent/api/v1/lan-automation
# - name: user
# endpoint: /dna/system/api/v1/user
# - name: virtual_network_to_fabric_site
# endpoint: /dna/intent/api/v1/business/sda/virtual-network
# - name: wireless_enterprise_ssid
# endpoint: /dna/intent/api/v1/enterprise-ssid
# - name: authentication_policy_server
# endpoint: /dna/intent/api/v1/authentication-policy-servers
# - name: image
# endpoint: /dna/intent/api/v1/image/importation/source/file
# - name: network_profile
# endpoint: /api/v1/siteprofile
# - name: fabric_l3_handoff_ip_transit
# endpoint: /dna/intent/api/v1/sda/fabricDevices/layer3Handoffs/ipTransits
# - name: pnp_config_preview
# endpoint: /dna/intent/api/v1/onboarding/pnp-device/site-config-preview
# - name: image_activation
# endpoint: /dna/intent/api/v1/image/activation/device
# - name: wireless_profile
# endpoint: /intent/api/v1/wirelessProfiles
# - name: fabric_provision_device
# endpoint: /dna/intent/api/v1/sda/provisionDevices
# - name: deploy_template
# endpoint: /dna/intent/api/v2/template-programmer/template/deploy
# - name: pnp_device
# endpoint: /dna/intent/api/v1/onboarding/pnp-device
- name: anycast_gateway
endpoint: /dna/intent/api/v1/sda/anycastGateways
# - name: network_devices
# endpoint: /dna/intent/api/v1/network-device
# - name: site
# endpoint: /dna/intent/api/v1/sites
# children:
# - name: wireless_ssid
# endpoint: /wirelessSettings/ssids
# - name: aaa_settings
# endpoint: /aaaSettings
# - name: project
# endpoint: /dna/intent/api/v1/template-programmer/project
# children:
# - name: template
# endpoint: /template
# - name:
# endpoint: /dna/intent/api/v1/sda/fabrics
# children:
# - name: fabric_vlan_to_ssid
# endpoint: /vlanToSsids
# - name:
# endpoint: /dna/intent/api/v1/networkprofile
# children:
# - name: associate_site_to_network_profile
# endpoint: /site/%v
# - name: tag
# endpoint: /dna/intent/api/v1/tag
# children:
# - name: assign_templates_to_tag
# endpoint: /member
250 changes: 250 additions & 0 deletions nac_collector/cisco_client_catalystcenter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
import logging

import click
import requests
import urllib3

from nac_collector.cisco_client import CiscoClient

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger("main")

# Suppress urllib3 warnings
logging.getLogger("urllib3").setLevel(logging.ERROR)


class CiscoClientCATALYSTCENTER(CiscoClient):
"""
This class inherits from the abstract class CiscoClient. It's used for authenticating
with the Cisco Catalyst Center API and retrieving data from various endpoints.
Authentication is username/password based and a session is created upon successful
authentication for subsequent requests.
"""

DNAC_AUTH_ENDPOINT = "/dna/system/api/v1/auth/token"
SOLUTION = "catalystcenter"

"Used for mapping credentials to the correct endpoint"
mappings = {
"credentials_snmpv3": "snmpV3",
"credentials_snmpv2_read": "snmpV2cRead",
"credentials_snmpv2_write": "snmpV2cWrite",
"credentials_cli": "cliCredential",
"credentials_https_read": "httpsRead",
"credentials_https_write": "httpsWrite",
}

def __init__(
self,
username,
password,
base_url,
max_retries,
retry_after,
timeout,
ssl_verify,
):
super().__init__(
username, password, base_url, max_retries, retry_after, timeout, ssl_verify
)

def authenticate(self):
"""
Perform token-based authentication.
Returns:
bool: True if authentication is successful, False otherwise.
"""

auth_url = f"{self.base_url}{self.DNAC_AUTH_ENDPOINT}"

headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": "application/json",
}
response = requests.post(
auth_url,
auth=(self.username, self.password),
headers=headers,
verify=self.ssl_verify,
timeout=self.timeout,
)

if response and response.status_code == 200:
logger.info("Authentication Successful for URL: %s", auth_url)

token = response.json()["Token"]

# Create a session after successful authentication
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/json",
"x-auth-token": token,
}
)
return True

logger.error(
"Authentication failed with status code: %s",
response.status_code,
)
return False

def process_endpoint_data(self, endpoint, endpoint_dict, data):
"""
Process the data for a given endpoint and update the endpoint_dict.
Parameters:
endpoint (dict): The endpoint configuration.
endpoint_dict (dict): The dictionary to store processed data.
data (dict or list): The data fetched from the endpoint.
Returns:
dict: The updated endpoint dictionary with processed data.
"""

if data is None:
endpoint_dict[endpoint["name"]].append(
{"data": {}, "endpoint": endpoint["endpoint"]}
)

# License API returns a list of dictionaries
elif isinstance(data, list):
endpoint_dict[endpoint["name"]].append(
{"data": data, "endpoint": endpoint["endpoint"]}
)
elif isinstance(data.get("response"), dict):
for k, v in data.get("response").items():
if self.mappings[endpoint["name"]] == k:
for i in v:
endpoint_dict[endpoint["name"]].append(
{
"data": i,
"endpoint": endpoint["endpoint"]
+ "/"
+ self.get_id_value(i),
}
)
elif data.get("response"):
for i in data.get("response"):
endpoint_dict[endpoint["name"]].append(
{
"data": i,
"endpoint": endpoint["endpoint"] + "/" + self.get_id_value(i),
}
)

return endpoint_dict # Return the processed endpoint dictionary

def get_from_endpoints(self, endpoints_yaml_file):
"""
Retrieve data from a list of endpoints specified in a YAML file and
run GET requests to download data from controller.
Parameters:
endpoints_yaml_file (str): The name of the YAML file containing the endpoints.
Returns:
dict: The final dictionary containing the data retrieved from the endpoints.
"""

# Load endpoints from the YAML file
logger.info("Loading endpoints from %s", endpoints_yaml_file)
with open(endpoints_yaml_file, "r", encoding="utf-8") as f:
endpoints = self.yaml.load(f)

# Initialize an empty dictionary
final_dict = {}

# Iterate over all endpoints
with click.progressbar(endpoints, label="Processing endpoints") as endpoint_bar:
for endpoint in endpoint_bar:
logger.info("Processing endpoint: %s", endpoint["name"])

endpoint_dict = CiscoClient.create_endpoint_dict(endpoint)

data = self.fetch_data(endpoint["endpoint"])

# Process the endpoint data and get the updated dictionary
endpoint_dict = self.process_endpoint_data(
endpoint, endpoint_dict, data
)

if endpoint.get("children"):
# Create empty list of parent_endpoint_ids
parent_endpoint_ids = []

for item in endpoint_dict[endpoint["name"]]:
# Add the item's id to the list
try:
parent_endpoint_ids.append(item["data"]["id"])
except KeyError:
continue

for children_endpoint in endpoint["children"]:
logger.info(
"Processing children endpoint: %s",
endpoint["endpoint"]
+ "/%v"
+ children_endpoint["endpoint"],
)

# Iterate over the parent endpoint ids
for id_ in parent_endpoint_ids:
children_endpoint_dict = CiscoClient.create_endpoint_dict(
children_endpoint
)

# Replace '%v' in the endpoint with the id
children_joined_endpoint = (
endpoint["endpoint"]
+ "/"
+ id_
+ children_endpoint["endpoint"]
)

data = self.fetch_data(children_joined_endpoint)

# Process the children endpoint data and get the updated dictionary
children_endpoint_dict = self.process_endpoint_data(
children_endpoint, children_endpoint_dict, data
)

for index, value in enumerate(
endpoint_dict[endpoint["name"]]
):
if value.get("data").get("id") == id_:
endpoint_dict[endpoint["name"]][index].setdefault(
"children", {}
)[
children_endpoint["name"]
] = children_endpoint_dict[
children_endpoint["name"]
]

# Save results to dictionary
final_dict.update(endpoint_dict)
return final_dict

@staticmethod
def get_id_value(i):
"""
Attempts to get the 'id' or 'name' value from a dictionary.
Parameters:
i (dict): The dictionary to get the 'id' or 'name' value from.
Returns:
str or None: The 'id' or 'name' value if it exists, None otherwise.
"""
try:
id_value = i["id"]
except KeyError:
try:
id_value = i["name"]
except KeyError:
id_value = None

return id_value
Loading

0 comments on commit f4f1a3b

Please sign in to comment.