diff --git a/python/neutron-understack/neutron_understack/nautobot.py b/python/neutron-understack/neutron_understack/nautobot.py index e28a5df1..423ffa6d 100644 --- a/python/neutron-understack/neutron_understack/nautobot.py +++ b/python/neutron-understack/neutron_understack/nautobot.py @@ -1,77 +1,78 @@ import inspect -import pathlib +from pprint import pformat from urllib.parse import urljoin from uuid import UUID import requests from neutron_lib import exceptions as exc from oslo_log import log -from requests.models import HTTPError LOG = log.getLogger(__name__) -class NautobotError(exc.NeutronException): - message = "Nautobot error" +class NautobotRequestError(exc.NeutronException): + message = "Nautobot API returned error %(code)s for %(url)s: %(body)s" -class NautobotNotFoundError(NautobotError): - message = "%(obj)s not found in Nautobot. ref=%(ref)s" +class NautobotOSError(exc.NeutronException): + message = "Error occurred querying Nautobot: %(err)s" -class Nautobot: - CALLER_FRAME = 1 +class NautobotNotFoundError(exc.NeutronException): + message = "%(obj)s not found in Nautobot. ref=%(ref)s" - def __init__(self, nb_url: str | None = None, nb_token: str | None = None): - """Basic Nautobot wrapper because pynautobot doesn't expose plugin APIs.""" - self.base_url = nb_url or "http://nautobot-default.nautobot.svc.cluster.local" - self.token = nb_token or self._fetch_nb_token() - self.s = requests.Session() - self.s.headers.update({"Authorization": f"Token {self.token}"}) - def _fetch_nb_token(self): - file = pathlib.Path("/etc/nb-token/token") - with file.open() as f: - return f.read().strip() +class Nautobot: + """Basic Nautobot wrapper because pynautobot doesn't expose plugin APIs.""" - def _log_and_raise_for_status(self, response): - try: - response.raise_for_status() - except HTTPError as error: - LOG.error("Nautobot error: %s %s", error, response.content) - raise NautobotError() from error + def __init__(self, nb_url: str, nb_token: str): + self.base_url = nb_url + self.session = requests.Session() + self.session.headers.update({"Authorization": f"Token {nb_token}"}) def make_api_request( - self, url: str, method: str, payload: dict | None = None, params=None + self, + method: str, + url: str, + payload: dict | None = None, + params: dict[str, str] | None = None, + timeout: int = 10, ) -> dict: - endpoint_url = urljoin(self.base_url, url) - caller_function = inspect.stack()[self.CALLER_FRAME].function - http_method = method.upper() + full_url = urljoin(self.base_url, url) - LOG.debug( - "%(caller_function)s payload: %(payload)s", - {"payload": payload, "caller_function": caller_function}, - ) - resp = self.s.request( - http_method, - endpoint_url, - timeout=10, - json=payload, - params=params, - allow_redirects=False, - ) - - if resp.content: - resp_data = resp.json() - else: - resp_data = {"status_code": resp.status_code} + try: + response = self.session.request( + method, + full_url, + timeout=timeout, + json=payload, + params=params, + allow_redirects=False, + ) + except Exception as e: + raise NautobotOSError(err=e) from e + + if response.status_code >= 300: + raise NautobotRequestError( + code=response.status_code, url=full_url, body=response.content + ) + if not response.content: + response_data = {"status_code": response.status_code} + try: + response_data = response.json() + except requests.exceptions.JSONDecodeError: + response_data = {"body": response.content} + caller_function = inspect.stack()[1].function LOG.debug( - "%(caller_function)s resp: %(resp)s", - {"resp": resp_data, "caller_function": caller_function}, + "[%s] %s %s %s ==> %s", + caller_function, + full_url, + method, + pformat(payload), + pformat(response_data), ) - self._log_and_raise_for_status(resp) - return resp_data + return response_data def ucvni_create( self, @@ -92,40 +93,35 @@ def ucvni_create( payload["ucvni_type"] = "INFRA" url = "/api/plugins/undercloud-vni/ucvnis/" - resp_data = self.make_api_request(url, "post", payload) - return resp_data + return self.make_api_request("POST", url, payload) def ucvni_delete(self, network_id): url = f"/api/plugins/undercloud-vni/ucvnis/{network_id}/" - return self.make_api_request(url, "delete") + return self.make_api_request("DELETE", url) def fetch_namespace_by_name(self, name: str) -> str: url = f"/api/ipam/namespaces/?name={name}&depth=1" - resp_data = self.make_api_request(url, "get") + resp_data = self.make_api_request("GET", url) try: return resp_data["results"][0]["id"] except (IndexError, KeyError) as error: - LOG.error("Nautobot error: %(error)s", {"error": error}) raise NautobotNotFoundError(obj="namespace", ref=name) from error def namespace_create(self, name: str) -> dict: - url = "/api/ipam/namespaces/" payload = {"name": name} - return self.make_api_request(url, "post", payload) + return self.make_api_request("POST", "/api/ipam/namespaces/", payload) - def namespace_delete(self, namespace_uuid: str) -> dict: - url = f"/api/ipam/namespaces/{namespace_uuid}/" - return self.make_api_request(url, "delete") + def namespace_delete(self, uuid: str) -> dict: + return self.make_api_request("DELETE", f"/api/ipam/namespaces/{uuid}/") def subnet_create(self, subnet_uuid: str, prefix: str, namespace_name: str) -> dict: - url = "/api/ipam/prefixes/" payload = { "id": subnet_uuid, "prefix": prefix, "status": "Active", "namespace": {"name": namespace_name}, } - return self.make_api_request(url, "post", payload) + return self.make_api_request("POST", "/api/ipam/prefixes/", payload) def associate_subnet_with_network( self, network_uuid: str, subnet_uuid: str, role: str @@ -141,11 +137,10 @@ def associate_subnet_with_network( }, }, } - self.make_api_request(url, "patch", payload) + self.make_api_request("PATCH", url, payload) - def subnet_delete(self, subnet_uuid: str) -> dict: - url = f"/api/ipam/prefixes/{subnet_uuid}/" - return self.make_api_request(url, "delete") + def subnet_delete(self, uuid: str) -> dict: + return self.make_api_request("DELETE", f"/api/ipam/prefixes/{uuid}/") def prep_switch_interface( self, @@ -168,9 +163,7 @@ def prep_switch_interface( "modify_native_vlan": modify_native_vlan, "vlan_tag": vlan_tag, } - resp_data = self.make_api_request(url, "post", payload) - - return resp_data + return self.make_api_request("POST", url, payload) def detach_port(self, connected_interface_id: str, ucvni_uuid: str) -> str: """Runs a Nautobot Job to cleanup a switch interface. @@ -185,30 +178,28 @@ def detach_port(self, connected_interface_id: str, ucvni_uuid: str) -> str: "ucvni_uuid": str(ucvni_uuid), "connected_interface_id": str(connected_interface_id), } - resp_data = self.make_api_request(url, "post", payload) + resp_data = self.make_api_request("POST", url, payload) return resp_data["vlan_group_id"] def configure_port_status(self, interface_uuid: str, status: str) -> dict: url = f"/api/dcim/interfaces/{interface_uuid}/" payload = {"status": {"name": status}} - resp_data = self.make_api_request(url, "patch", payload) - return resp_data + return self.make_api_request("PATCH", url, payload) def fetch_vlan_group_uuid(self, device_uuid: str) -> str: url = f"/api/dcim/devices/{device_uuid}/?include=relationships" - resp_data = self.make_api_request(url, "get") + resp_data = self.make_api_request("GET", url) try: vlan_group_uuid = resp_data["relationships"]["vlan_group_to_devices"][ "source" ]["objects"][0]["id"] - except (KeyError, IndexError, TypeError) as error: - LOG.error("vlan_group_uuid_error: %(error)s", {"error": error}) - raise NautobotError() from error + except (KeyError, IndexError, TypeError) as err: + raise NautobotNotFoundError(obj="device", ref=device_uuid) from err LOG.debug( - "vlan_group_uuid: %(vlan_group_uuid)s", {"vlan_group_uuid": vlan_group_uuid} + "Device %s belongs to vlan_group_uuid %s", device_uuid, vlan_group_uuid ) return vlan_group_uuid @@ -220,11 +211,7 @@ def check_vlan_availability(self, interface_id: str | UUID, vlan_tag: int) -> bo interface, identified by `interface_id`, is connected. """ url = "/api/plugins/undercloud-vni/vlan_availability_check" - params = { - "interface_id": str(interface_id), - "vlan_tag": int(vlan_tag), - } - response = self.make_api_request(url, "get", params=params) - if not response: - return False + params = {"interface_id": interface_id, "vlan_tag": str(vlan_tag)} + + response = self.make_api_request("GET", url, params=params) or {} return response.get("available", False) diff --git a/python/neutron-understack/neutron_understack/vlan_manager.py b/python/neutron-understack/neutron_understack/vlan_manager.py index 2a8c788f..aa4deb33 100644 --- a/python/neutron-understack/neutron_understack/vlan_manager.py +++ b/python/neutron-understack/neutron_understack/vlan_manager.py @@ -26,7 +26,7 @@ def create_vlan_for_network(self, context: NetworkContext): if not context.current: raise RuntimeError("no current context provided.") - vlan_tag = context.current["provider:segmentation_id"] + vlan_tag = int(context.current["provider:segmentation_id"]) allocated = self._allocate_vlan(context, vlan_tag) if allocated: