Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Refactor Nautobot api client and update logging / exception handling #666

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 70 additions & 83 deletions python/neutron-understack/neutron_understack/nautobot.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,78 @@
import inspect
import pathlib
from pprint import pformat
from urllib.parse import urljoin
from uuid import UUID

import requests
from neutron_lib import exceptions as exc
from oslo_log import log
from requests.models import HTTPError

LOG = log.getLogger(__name__)


class NautobotError(exc.NeutronException):
message = "Nautobot error"
class NautobotRequestError(exc.NeutronException):
message = "Nautobot API returned error %(code)s for %(url)s: %(body)s"


class NautobotNotFoundError(NautobotError):
message = "%(obj)s not found in Nautobot. ref=%(ref)s"
class NautobotOSError(exc.NeutronException):
message = "Error occurred querying Nautobot: %(err)s"


class Nautobot:
CALLER_FRAME = 1
class NautobotNotFoundError(exc.NeutronException):
message = "%(obj)s not found in Nautobot. ref=%(ref)s"

def __init__(self, nb_url: str | None = None, nb_token: str | None = None):
"""Basic Nautobot wrapper because pynautobot doesn't expose plugin APIs."""
self.base_url = nb_url or "http://nautobot-default.nautobot.svc.cluster.local"
self.token = nb_token or self._fetch_nb_token()
self.s = requests.Session()
self.s.headers.update({"Authorization": f"Token {self.token}"})

def _fetch_nb_token(self):
file = pathlib.Path("/etc/nb-token/token")
with file.open() as f:
return f.read().strip()
class Nautobot:
"""Basic Nautobot wrapper because pynautobot doesn't expose plugin APIs."""

def _log_and_raise_for_status(self, response):
try:
response.raise_for_status()
except HTTPError as error:
LOG.error("Nautobot error: %s %s", error, response.content)
raise NautobotError() from error
def __init__(self, nb_url: str, nb_token: str):
self.base_url = nb_url
self.session = requests.Session()
self.session.headers.update({"Authorization": f"Token {nb_token}"})

def make_api_request(
self, url: str, method: str, payload: dict | None = None, params=None
self,
method: str,
url: str,
payload: dict | None = None,
params: dict[str, str] | None = None,
timeout: int = 10,
) -> dict:
endpoint_url = urljoin(self.base_url, url)
caller_function = inspect.stack()[self.CALLER_FRAME].function
http_method = method.upper()
full_url = urljoin(self.base_url, url)

LOG.debug(
"%(caller_function)s payload: %(payload)s",
{"payload": payload, "caller_function": caller_function},
)
resp = self.s.request(
http_method,
endpoint_url,
timeout=10,
json=payload,
params=params,
allow_redirects=False,
)

if resp.content:
resp_data = resp.json()
else:
resp_data = {"status_code": resp.status_code}
try:
response = self.session.request(
method,
full_url,
timeout=timeout,
json=payload,
params=params,
allow_redirects=False,
)
except Exception as e:
raise NautobotOSError(err=e) from e

if response.status_code >= 300:
raise NautobotRequestError(
code=response.status_code, url=full_url, body=response.content
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
code=response.status_code, url=full_url, body=response.content
code=response.status_code, url=full_url, body=response.content

long term you'll probably want the response.json()["error"] by default.

)
if not response.content:
response_data = {"status_code": response.status_code}
try:
response_data = response.json()
except requests.exceptions.JSONDecodeError:
response_data = {"body": response.content}

caller_function = inspect.stack()[1].function
LOG.debug(
"%(caller_function)s resp: %(resp)s",
{"resp": resp_data, "caller_function": caller_function},
"[%s] %s %s %s ==> %s",
caller_function,
full_url,
method,
pformat(payload),
pformat(response_data),
)
self._log_and_raise_for_status(resp)
return resp_data
return response_data

def ucvni_create(
self,
Expand All @@ -92,40 +93,35 @@ def ucvni_create(
payload["ucvni_type"] = "INFRA"

url = "/api/plugins/undercloud-vni/ucvnis/"
resp_data = self.make_api_request(url, "post", payload)
return resp_data
return self.make_api_request("POST", url, payload)

def ucvni_delete(self, network_id):
url = f"/api/plugins/undercloud-vni/ucvnis/{network_id}/"
return self.make_api_request(url, "delete")
return self.make_api_request("DELETE", url)

def fetch_namespace_by_name(self, name: str) -> str:
url = f"/api/ipam/namespaces/?name={name}&depth=1"
resp_data = self.make_api_request(url, "get")
resp_data = self.make_api_request("GET", url)
try:
return resp_data["results"][0]["id"]
except (IndexError, KeyError) as error:
LOG.error("Nautobot error: %(error)s", {"error": error})
raise NautobotNotFoundError(obj="namespace", ref=name) from error

def namespace_create(self, name: str) -> dict:
url = "/api/ipam/namespaces/"
payload = {"name": name}
return self.make_api_request(url, "post", payload)
return self.make_api_request("POST", "/api/ipam/namespaces/", payload)

def namespace_delete(self, namespace_uuid: str) -> dict:
url = f"/api/ipam/namespaces/{namespace_uuid}/"
return self.make_api_request(url, "delete")
def namespace_delete(self, uuid: str) -> dict:
return self.make_api_request("DELETE", f"/api/ipam/namespaces/{uuid}/")

def subnet_create(self, subnet_uuid: str, prefix: str, namespace_name: str) -> dict:
url = "/api/ipam/prefixes/"
payload = {
"id": subnet_uuid,
"prefix": prefix,
"status": "Active",
"namespace": {"name": namespace_name},
}
return self.make_api_request(url, "post", payload)
return self.make_api_request("POST", "/api/ipam/prefixes/", payload)

def associate_subnet_with_network(
self, network_uuid: str, subnet_uuid: str, role: str
Expand All @@ -141,11 +137,10 @@ def associate_subnet_with_network(
},
},
}
self.make_api_request(url, "patch", payload)
self.make_api_request("PATCH", url, payload)

def subnet_delete(self, subnet_uuid: str) -> dict:
url = f"/api/ipam/prefixes/{subnet_uuid}/"
return self.make_api_request(url, "delete")
def subnet_delete(self, uuid: str) -> dict:
return self.make_api_request("DELETE", f"/api/ipam/prefixes/{uuid}/")

def prep_switch_interface(
self,
Expand All @@ -168,9 +163,7 @@ def prep_switch_interface(
"modify_native_vlan": modify_native_vlan,
"vlan_tag": vlan_tag,
}
resp_data = self.make_api_request(url, "post", payload)

return resp_data
return self.make_api_request("POST", url, payload)

def detach_port(self, connected_interface_id: str, ucvni_uuid: str) -> str:
"""Runs a Nautobot Job to cleanup a switch interface.
Expand All @@ -185,30 +178,28 @@ def detach_port(self, connected_interface_id: str, ucvni_uuid: str) -> str:
"ucvni_uuid": str(ucvni_uuid),
"connected_interface_id": str(connected_interface_id),
}
resp_data = self.make_api_request(url, "post", payload)
resp_data = self.make_api_request("POST", url, payload)

return resp_data["vlan_group_id"]

def configure_port_status(self, interface_uuid: str, status: str) -> dict:
url = f"/api/dcim/interfaces/{interface_uuid}/"
payload = {"status": {"name": status}}
resp_data = self.make_api_request(url, "patch", payload)
return resp_data
return self.make_api_request("PATCH", url, payload)

def fetch_vlan_group_uuid(self, device_uuid: str) -> str:
url = f"/api/dcim/devices/{device_uuid}/?include=relationships"

resp_data = self.make_api_request(url, "get")
resp_data = self.make_api_request("GET", url)
try:
vlan_group_uuid = resp_data["relationships"]["vlan_group_to_devices"][
"source"
]["objects"][0]["id"]
except (KeyError, IndexError, TypeError) as error:
LOG.error("vlan_group_uuid_error: %(error)s", {"error": error})
raise NautobotError() from error
except (KeyError, IndexError, TypeError) as err:
raise NautobotNotFoundError(obj="device", ref=device_uuid) from err

LOG.debug(
"vlan_group_uuid: %(vlan_group_uuid)s", {"vlan_group_uuid": vlan_group_uuid}
"Device %s belongs to vlan_group_uuid %s", device_uuid, vlan_group_uuid
)
return vlan_group_uuid

Expand All @@ -220,11 +211,7 @@ def check_vlan_availability(self, interface_id: str | UUID, vlan_tag: int) -> bo
interface, identified by `interface_id`, is connected.
"""
url = "/api/plugins/undercloud-vni/vlan_availability_check"
params = {
"interface_id": str(interface_id),
"vlan_tag": int(vlan_tag),
}
response = self.make_api_request(url, "get", params=params)
if not response:
return False
params = {"interface_id": interface_id, "vlan_tag": str(vlan_tag)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the conversion to integer was intentional here - we want to fail early if the user calls this method with a non-numeric vlan_tag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I will put it back - I naively assumed that a parameter of type "int" must contain an int value :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python's type whispering in full force. It's more like an ask instead of an insistence.


response = self.make_api_request("GET", url, params=params) or {}
return response.get("available", False)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def create_vlan_for_network(self, context: NetworkContext):
if not context.current:
raise RuntimeError("no current context provided.")

vlan_tag = context.current["provider:segmentation_id"]
vlan_tag = int(context.current["provider:segmentation_id"])
allocated = self._allocate_vlan(context, vlan_tag)

if allocated:
Expand Down