-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Refactor Nautobot api client and update logging / exception handling #666
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,77 +1,78 @@ | ||
import inspect | ||
import pathlib | ||
from pprint import pformat | ||
from urllib.parse import urljoin | ||
from uuid import UUID | ||
|
||
import requests | ||
from neutron_lib import exceptions as exc | ||
from oslo_log import log | ||
from requests.models import HTTPError | ||
|
||
LOG = log.getLogger(__name__) | ||
|
||
|
||
class NautobotError(exc.NeutronException): | ||
message = "Nautobot error" | ||
class NautobotRequestError(exc.NeutronException): | ||
message = "Nautobot API returned error %(code)s for %(url)s: %(body)s" | ||
|
||
|
||
class NautobotNotFoundError(NautobotError): | ||
message = "%(obj)s not found in Nautobot. ref=%(ref)s" | ||
class NautobotOSError(exc.NeutronException): | ||
message = "Error occurred querying Nautobot: %(err)s" | ||
|
||
|
||
class Nautobot: | ||
CALLER_FRAME = 1 | ||
class NautobotNotFoundError(exc.NeutronException): | ||
message = "%(obj)s not found in Nautobot. ref=%(ref)s" | ||
|
||
def __init__(self, nb_url: str | None = None, nb_token: str | None = None): | ||
"""Basic Nautobot wrapper because pynautobot doesn't expose plugin APIs.""" | ||
self.base_url = nb_url or "http://nautobot-default.nautobot.svc.cluster.local" | ||
self.token = nb_token or self._fetch_nb_token() | ||
self.s = requests.Session() | ||
self.s.headers.update({"Authorization": f"Token {self.token}"}) | ||
|
||
def _fetch_nb_token(self): | ||
file = pathlib.Path("/etc/nb-token/token") | ||
with file.open() as f: | ||
return f.read().strip() | ||
class Nautobot: | ||
"""Basic Nautobot wrapper because pynautobot doesn't expose plugin APIs.""" | ||
|
||
def _log_and_raise_for_status(self, response): | ||
try: | ||
response.raise_for_status() | ||
except HTTPError as error: | ||
LOG.error("Nautobot error: %s %s", error, response.content) | ||
raise NautobotError() from error | ||
def __init__(self, nb_url: str, nb_token: str): | ||
self.base_url = nb_url | ||
self.session = requests.Session() | ||
self.session.headers.update({"Authorization": f"Token {nb_token}"}) | ||
|
||
def make_api_request( | ||
self, url: str, method: str, payload: dict | None = None, params=None | ||
self, | ||
method: str, | ||
url: str, | ||
payload: dict | None = None, | ||
params: dict[str, str] | None = None, | ||
timeout: int = 10, | ||
) -> dict: | ||
endpoint_url = urljoin(self.base_url, url) | ||
caller_function = inspect.stack()[self.CALLER_FRAME].function | ||
http_method = method.upper() | ||
full_url = urljoin(self.base_url, url) | ||
|
||
LOG.debug( | ||
"%(caller_function)s payload: %(payload)s", | ||
{"payload": payload, "caller_function": caller_function}, | ||
) | ||
resp = self.s.request( | ||
http_method, | ||
endpoint_url, | ||
timeout=10, | ||
json=payload, | ||
params=params, | ||
allow_redirects=False, | ||
) | ||
|
||
if resp.content: | ||
resp_data = resp.json() | ||
else: | ||
resp_data = {"status_code": resp.status_code} | ||
try: | ||
response = self.session.request( | ||
method, | ||
full_url, | ||
timeout=timeout, | ||
json=payload, | ||
params=params, | ||
allow_redirects=False, | ||
) | ||
except Exception as e: | ||
raise NautobotOSError(err=e) from e | ||
|
||
if response.status_code >= 300: | ||
raise NautobotRequestError( | ||
code=response.status_code, url=full_url, body=response.content | ||
) | ||
if not response.content: | ||
response_data = {"status_code": response.status_code} | ||
try: | ||
response_data = response.json() | ||
except requests.exceptions.JSONDecodeError: | ||
response_data = {"body": response.content} | ||
|
||
caller_function = inspect.stack()[1].function | ||
LOG.debug( | ||
"%(caller_function)s resp: %(resp)s", | ||
{"resp": resp_data, "caller_function": caller_function}, | ||
"[%s] %s %s %s ==> %s", | ||
caller_function, | ||
full_url, | ||
method, | ||
pformat(payload), | ||
pformat(response_data), | ||
) | ||
self._log_and_raise_for_status(resp) | ||
return resp_data | ||
return response_data | ||
|
||
def ucvni_create( | ||
self, | ||
|
@@ -92,40 +93,35 @@ def ucvni_create( | |
payload["ucvni_type"] = "INFRA" | ||
|
||
url = "/api/plugins/undercloud-vni/ucvnis/" | ||
resp_data = self.make_api_request(url, "post", payload) | ||
return resp_data | ||
return self.make_api_request("POST", url, payload) | ||
|
||
def ucvni_delete(self, network_id): | ||
url = f"/api/plugins/undercloud-vni/ucvnis/{network_id}/" | ||
return self.make_api_request(url, "delete") | ||
return self.make_api_request("DELETE", url) | ||
|
||
def fetch_namespace_by_name(self, name: str) -> str: | ||
url = f"/api/ipam/namespaces/?name={name}&depth=1" | ||
resp_data = self.make_api_request(url, "get") | ||
resp_data = self.make_api_request("GET", url) | ||
try: | ||
return resp_data["results"][0]["id"] | ||
except (IndexError, KeyError) as error: | ||
LOG.error("Nautobot error: %(error)s", {"error": error}) | ||
raise NautobotNotFoundError(obj="namespace", ref=name) from error | ||
|
||
def namespace_create(self, name: str) -> dict: | ||
url = "/api/ipam/namespaces/" | ||
payload = {"name": name} | ||
return self.make_api_request(url, "post", payload) | ||
return self.make_api_request("POST", "/api/ipam/namespaces/", payload) | ||
|
||
def namespace_delete(self, namespace_uuid: str) -> dict: | ||
url = f"/api/ipam/namespaces/{namespace_uuid}/" | ||
return self.make_api_request(url, "delete") | ||
def namespace_delete(self, uuid: str) -> dict: | ||
return self.make_api_request("DELETE", f"/api/ipam/namespaces/{uuid}/") | ||
|
||
def subnet_create(self, subnet_uuid: str, prefix: str, namespace_name: str) -> dict: | ||
url = "/api/ipam/prefixes/" | ||
payload = { | ||
"id": subnet_uuid, | ||
"prefix": prefix, | ||
"status": "Active", | ||
"namespace": {"name": namespace_name}, | ||
} | ||
return self.make_api_request(url, "post", payload) | ||
return self.make_api_request("POST", "/api/ipam/prefixes/", payload) | ||
|
||
def associate_subnet_with_network( | ||
self, network_uuid: str, subnet_uuid: str, role: str | ||
|
@@ -141,11 +137,10 @@ def associate_subnet_with_network( | |
}, | ||
}, | ||
} | ||
self.make_api_request(url, "patch", payload) | ||
self.make_api_request("PATCH", url, payload) | ||
|
||
def subnet_delete(self, subnet_uuid: str) -> dict: | ||
url = f"/api/ipam/prefixes/{subnet_uuid}/" | ||
return self.make_api_request(url, "delete") | ||
def subnet_delete(self, uuid: str) -> dict: | ||
return self.make_api_request("DELETE", f"/api/ipam/prefixes/{uuid}/") | ||
|
||
def prep_switch_interface( | ||
self, | ||
|
@@ -168,9 +163,7 @@ def prep_switch_interface( | |
"modify_native_vlan": modify_native_vlan, | ||
"vlan_tag": vlan_tag, | ||
} | ||
resp_data = self.make_api_request(url, "post", payload) | ||
|
||
return resp_data | ||
return self.make_api_request("POST", url, payload) | ||
|
||
def detach_port(self, connected_interface_id: str, ucvni_uuid: str) -> str: | ||
"""Runs a Nautobot Job to cleanup a switch interface. | ||
|
@@ -185,30 +178,28 @@ def detach_port(self, connected_interface_id: str, ucvni_uuid: str) -> str: | |
"ucvni_uuid": str(ucvni_uuid), | ||
"connected_interface_id": str(connected_interface_id), | ||
} | ||
resp_data = self.make_api_request(url, "post", payload) | ||
resp_data = self.make_api_request("POST", url, payload) | ||
|
||
return resp_data["vlan_group_id"] | ||
|
||
def configure_port_status(self, interface_uuid: str, status: str) -> dict: | ||
url = f"/api/dcim/interfaces/{interface_uuid}/" | ||
payload = {"status": {"name": status}} | ||
resp_data = self.make_api_request(url, "patch", payload) | ||
return resp_data | ||
return self.make_api_request("PATCH", url, payload) | ||
|
||
def fetch_vlan_group_uuid(self, device_uuid: str) -> str: | ||
url = f"/api/dcim/devices/{device_uuid}/?include=relationships" | ||
|
||
resp_data = self.make_api_request(url, "get") | ||
resp_data = self.make_api_request("GET", url) | ||
try: | ||
vlan_group_uuid = resp_data["relationships"]["vlan_group_to_devices"][ | ||
"source" | ||
]["objects"][0]["id"] | ||
except (KeyError, IndexError, TypeError) as error: | ||
LOG.error("vlan_group_uuid_error: %(error)s", {"error": error}) | ||
raise NautobotError() from error | ||
except (KeyError, IndexError, TypeError) as err: | ||
raise NautobotNotFoundError(obj="device", ref=device_uuid) from err | ||
|
||
LOG.debug( | ||
"vlan_group_uuid: %(vlan_group_uuid)s", {"vlan_group_uuid": vlan_group_uuid} | ||
"Device %s belongs to vlan_group_uuid %s", device_uuid, vlan_group_uuid | ||
) | ||
return vlan_group_uuid | ||
|
||
|
@@ -220,11 +211,7 @@ def check_vlan_availability(self, interface_id: str | UUID, vlan_tag: int) -> bo | |
interface, identified by `interface_id`, is connected. | ||
""" | ||
url = "/api/plugins/undercloud-vni/vlan_availability_check" | ||
params = { | ||
"interface_id": str(interface_id), | ||
"vlan_tag": int(vlan_tag), | ||
} | ||
response = self.make_api_request(url, "get", params=params) | ||
if not response: | ||
return False | ||
params = {"interface_id": interface_id, "vlan_tag": str(vlan_tag)} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the conversion to integer was intentional here - we want to fail early if the user calls this method with a non-numeric There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I will put it back - I naively assumed that a parameter of type "int" must contain an int value :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Python's type whispering in full force. It's more like an ask instead of an insistence. |
||
|
||
response = self.make_api_request("GET", url, params=params) or {} | ||
return response.get("available", False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long term you'll probably want the
response.json()["error"]
by default.