From af3eacf611e4cbc93c4e306b5b8b1c328158ec07 Mon Sep 17 00:00:00 2001 From: Graham Dumpleton Date: Tue, 6 Aug 2024 20:17:04 +1000 Subject: [PATCH] New lookup service code. (#527) * Add initial files for workshop lookup service. * Add handling of cluster configurations. * Update HTTP error response codes for lookup service. * Rename auth code file. * Resource renaming and additional REST API endpoints. * Drop unnecessary endpoint. * Add means to register local cluster in lookup service. * Add tracking of training portals. * Capture portal auth details. * Add tracking of workshops. * Add lookup for portals hosting workshop. * Renaming and start adding lookup of environments. * Create derived view of portal and environment resources. * Cross link actual cluster, portal and environment objects. * Code restructuring. * Fold environment database into portal. * Fold portal database into cluster. * Encapsulate REST API calls in portal. * Start of trying to track sessions. * Update to use user tracking for workshop sessions. * Add additional details in workshop request response. * Drop old code which is no longer required. * Add scoring for candidate workshop environments. * Add debugging for workshop environment selection. * Update capacity details for workshop environment. * Add logging for workshop requests. * Code cleanup and add missing functionality. * Split out routes registration from main program module. * Add ability to query workshops by tenants endpoint. * Add means to get portals from clusters. * Align naming. * Expand APIs for clusters. * Simplify roles. * Allow wildcards on tenant and cluster/portal selectors. * Use environment variable for configuration namespace. --- .gitignore | 1 + Makefile | 10 +- lookup-service/Dockerfile | 39 ++ lookup-service/README.md | 6 + lookup-service/requirements.txt | 7 + lookup-service/service/__init__.py | 0 lookup-service/service/caches/__init__.py | 0 lookup-service/service/caches/clients.py | 47 ++ lookup-service/service/caches/clusters.py | 48 ++ lookup-service/service/caches/databases.py | 126 ++++ lookup-service/service/caches/environments.py | 126 ++++ lookup-service/service/caches/portals.py | 346 +++++++++++ lookup-service/service/caches/sessions.py | 37 ++ lookup-service/service/caches/tenants.py | 71 +++ lookup-service/service/config.py | 36 ++ lookup-service/service/handlers/__init__.py | 0 lookup-service/service/handlers/clients.py | 68 +++ lookup-service/service/handlers/clusters.py | 568 ++++++++++++++++++ lookup-service/service/handlers/tenants.py | 64 ++ lookup-service/service/helpers/__init__.py | 0 lookup-service/service/helpers/kubeconfig.py | 240 ++++++++ lookup-service/service/helpers/objects.py | 21 + lookup-service/service/helpers/operator.py | 172 ++++++ lookup-service/service/helpers/selectors.py | 144 +++++ lookup-service/service/main.py | 251 ++++++++ lookup-service/service/routes/__init__.py | 22 + lookup-service/service/routes/authnz.py | 212 +++++++ lookup-service/service/routes/clients.py | 55 ++ lookup-service/service/routes/clusters.py | 314 ++++++++++ lookup-service/service/routes/portals.py | 44 ++ lookup-service/service/routes/tenants.py | 127 ++++ lookup-service/service/routes/workshops.py | 334 ++++++++++ lookup-service/service/service.py | 21 + lookup-service/start-service.sh | 5 + 34 files changed, 3560 insertions(+), 2 deletions(-) create mode 100644 lookup-service/Dockerfile create mode 100644 lookup-service/README.md create mode 100644 lookup-service/requirements.txt create mode 100644 lookup-service/service/__init__.py create mode 100644 lookup-service/service/caches/__init__.py create mode 100644 lookup-service/service/caches/clients.py create mode 100644 lookup-service/service/caches/clusters.py create mode 100644 lookup-service/service/caches/databases.py create mode 100644 lookup-service/service/caches/environments.py create mode 100644 lookup-service/service/caches/portals.py create mode 100644 lookup-service/service/caches/sessions.py create mode 100644 lookup-service/service/caches/tenants.py create mode 100644 lookup-service/service/config.py create mode 100644 lookup-service/service/handlers/__init__.py create mode 100644 lookup-service/service/handlers/clients.py create mode 100644 lookup-service/service/handlers/clusters.py create mode 100644 lookup-service/service/handlers/tenants.py create mode 100644 lookup-service/service/helpers/__init__.py create mode 100644 lookup-service/service/helpers/kubeconfig.py create mode 100644 lookup-service/service/helpers/objects.py create mode 100644 lookup-service/service/helpers/operator.py create mode 100644 lookup-service/service/helpers/selectors.py create mode 100644 lookup-service/service/main.py create mode 100644 lookup-service/service/routes/__init__.py create mode 100644 lookup-service/service/routes/authnz.py create mode 100644 lookup-service/service/routes/clients.py create mode 100644 lookup-service/service/routes/clusters.py create mode 100644 lookup-service/service/routes/portals.py create mode 100644 lookup-service/service/routes/tenants.py create mode 100644 lookup-service/service/routes/workshops.py create mode 100644 lookup-service/service/service.py create mode 100755 lookup-service/start-service.sh diff --git a/.gitignore b/.gitignore index 06651b6a2..edfb6b9b5 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ __pycache__ /client-programs/bin /client-programs/pkg/renderer/files /developer-testing +/lookup-service/venv /project-docs/venv /project-docs/_build /session-manager/venv diff --git a/Makefile b/Makefile index f3de391c9..166866828 100644 --- a/Makefile +++ b/Makefile @@ -24,14 +24,14 @@ build-all-images: build-session-manager build-training-portal \ build-jdk17-environment build-jdk21-environment \ build-conda-environment build-docker-registry \ build-pause-container build-secrets-manager build-tunnel-manager \ - build-image-cache build-assets-server + build-image-cache build-assets-server build-lookup-service push-all-images: push-session-manager push-training-portal \ push-base-environment push-jdk8-environment push-jdk11-environment \ push-jdk17-environment push-jdk21-environment \ push-conda-environment push-docker-registry \ push-pause-container push-secrets-manager push-tunnel-manager \ - push-image-cache push-assets-server + push-image-cache push-assets-server push-lookup-service build-core-images: build-session-manager build-training-portal \ build-base-environment build-docker-registry build-pause-container \ @@ -133,6 +133,12 @@ build-assets-server: push-assets-server: build-assets-server docker push $(IMAGE_REPOSITORY)/educates-assets-server:$(PACKAGE_VERSION) +build-lookup-service: + docker build --progress plain --platform $(DOCKER_PLATFORM) -t $(IMAGE_REPOSITORY)/educates-lookup-service:$(PACKAGE_VERSION) lookup-service + +push-lookup-service: build-lookup-service + docker push $(IMAGE_REPOSITORY)/educates-lookup-service:$(PACKAGE_VERSION) + verify-installer-config: ifneq ("$(wildcard developer-testing/educates-installer-values.yaml)","") @ytt --file carvel-packages/installer/bundle/config --data-values-file developer-testing/educates-installer-values.yaml diff --git a/lookup-service/Dockerfile b/lookup-service/Dockerfile new file mode 100644 index 000000000..0f4752279 --- /dev/null +++ b/lookup-service/Dockerfile @@ -0,0 +1,39 @@ +FROM fedora:39 + +RUN INSTALL_PKGS=" \ + findutils \ + gcc \ + glibc-langpack-en \ + procps \ + python3-devel \ + python3-pip \ + redhat-rpm-config \ + which \ + " && \ + dnf install -y --setopt=tsflags=nodocs $INSTALL_PKGS && \ + dnf clean -y --enablerepo='*' all && \ + useradd -u 1001 -g 0 -M -d /opt/app-root/src default && \ + mkdir -p /opt/app-root/src && \ + chown -R 1001:0 /opt/app-root + +WORKDIR /opt/app-root/src + +ENV PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + LC_ALL=en_US.UTF-8 \ + LANG=en_US.UTF-8 + +USER 1001 + +COPY --chown=1001:0 requirements.txt /opt/app-root/requirements.txt + +ENV PATH=/opt/app-root/bin:/opt/app-root/venv/bin:$PATH + +RUN python3 -m venv /opt/app-root/venv && \ + . /opt/app-root/venv/bin/activate && \ + pip install --no-cache-dir -U pip setuptools wheel && \ + pip install --no-cache-dir -r /opt/app-root/requirements.txt + +COPY --chown=1001:0 ./ /opt/app-root/src + +CMD [ "/opt/app-root/src/start-service.sh" ] diff --git a/lookup-service/README.md b/lookup-service/README.md new file mode 100644 index 000000000..d6bc49e3e --- /dev/null +++ b/lookup-service/README.md @@ -0,0 +1,6 @@ +Lookup Service +============== + +This directory holds the source code for the Educates lookup service. It +provides a high level REST API for accessing workshops, where workshops may +be spread across one or more training portals, including across clusters. diff --git a/lookup-service/requirements.txt b/lookup-service/requirements.txt new file mode 100644 index 000000000..a308d2bbc --- /dev/null +++ b/lookup-service/requirements.txt @@ -0,0 +1,7 @@ +kopf[full-auth]==1.37.2 +bcrypt==4.1.3 +aiohttp==3.9.5 +PyYAML==6.0.1 +pykube-ng==23.6.0 +wrapt==1.16.0 +PyJWT==2.8.0 diff --git a/lookup-service/service/__init__.py b/lookup-service/service/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lookup-service/service/caches/__init__.py b/lookup-service/service/caches/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lookup-service/service/caches/clients.py b/lookup-service/service/caches/clients.py new file mode 100644 index 000000000..f6f586240 --- /dev/null +++ b/lookup-service/service/caches/clients.py @@ -0,0 +1,47 @@ +"""Configuration for clients of the service.""" + +import fnmatch +from dataclasses import dataclass +from typing import List, Set + + +@dataclass +class ClientConfig: + """Configuration object for a client of the service.""" + + name: str + uid: str + password: str + tenants: List[str] + roles: List[str] + + def check_password(self, password: str) -> bool: + """Checks the password provided against the client's password.""" + + return self.password == password + + def validate_identity(self, uid: str) -> bool: + """Validate the identity provided against the client's identity.""" + + return self.uid == uid + + def has_required_role(self, *roles: str) -> Set: + """Check if the client has any of the roles provided. We return back a + set containing the roles that matched.""" + + matched_roles = set() + + for role in roles: + if role in self.roles: + matched_roles.add(role) + + return matched_roles + + def allowed_access_to_tenant(self, tenant: str) -> bool: + """Check if the client has access to the tenant.""" + + for pattern in self.tenants: + if fnmatch.fnmatch(tenant, pattern): + return True + + return False diff --git a/lookup-service/service/caches/clusters.py b/lookup-service/service/caches/clusters.py new file mode 100644 index 000000000..8ae031a98 --- /dev/null +++ b/lookup-service/service/caches/clusters.py @@ -0,0 +1,48 @@ +"""Configuration for target clusters.""" + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Dict, List + +if TYPE_CHECKING: + from .portals import TrainingPortal + + +@dataclass +class ClusterConfig: + """Configuration object for a target cluster. This includes a database of + the training portals hosted on the cluster.""" + + name: str + uid: str + labels: Dict[str, str] + kubeconfig: Dict[str, Any] + portals: Dict[str, "TrainingPortal"] + + def __init__( + self, name: str, uid: str, labels: Dict[str, str], kubeconfig: Dict[str, Any] + ): + self.name = name + self.uid = uid + self.labels = labels + self.kubeconfig = kubeconfig + self.portals = {} + + def add_portal(self, portal: "TrainingPortal") -> None: + """Add a portal to the cluster.""" + + self.portals[portal.name] = portal + + def remove_portal(self, name: str) -> None: + """Remove a portal from the cluster.""" + + self.portals.pop(name, None) + + def get_portals(self) -> List["TrainingPortal"]: + """Retrieve a list of portals from the cluster.""" + + return list(self.portals.values()) + + def get_portal(self, name: str) -> "TrainingPortal": + """Retrieve a portal from the cluster by name.""" + + return self.portals.get(name) diff --git a/lookup-service/service/caches/databases.py b/lookup-service/service/caches/databases.py new file mode 100644 index 000000000..cab754c34 --- /dev/null +++ b/lookup-service/service/caches/databases.py @@ -0,0 +1,126 @@ +"""Database classes for storing state of everything.""" + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Dict, List + +if TYPE_CHECKING: + from .clients import ClientConfig + from .clusters import ClusterConfig + from .tenants import TenantConfig + + +@dataclass +class ClientDatabase: + """Database for storing client configurations. Clients are stored in a + dictionary with the client's name as the key and the client configuration + object as the value.""" + + clients: Dict[str, "ClientConfig"] + + def __init__(self) -> None: + self.clients = {} + + def update_client(self, client: "ClientConfig") -> None: + """Update the client in the database. If the client does not exist in + the database, it will be added.""" + + self.clients[client.name] = client + + def remove_client(self, name: str) -> None: + """Remove a client from the database.""" + + self.clients.pop(name, None) + + def get_clients(self) -> List["ClientConfig"]: + """Retrieve a list of clients from the database.""" + + return list(self.clients.values()) + + def get_client(self, name: str) -> "ClientConfig": + """Retrieve a client from the database by name.""" + + return self.clients.get(name) + + def authenticate_client(self, name: str, password: str) -> str | None: + """Validate a client's credentials. Returning the uid of the client if + the credentials are valid.""" + + client = self.get_client(name) + + if client is None: + return + + if client.check_password(password): + return client.uid + + +@dataclass +class TenantDatabase: + """Database for storing tenant configurations. Tenants are stored in a + dictionary with the tenant's name as the key and the tenant configuration + object as the value.""" + + tenants: Dict[str, "TenantConfig"] + + def __init__(self): + self.tenants = {} + + def update_tenant(self, tenant: "TenantConfig") -> None: + """Update the tenant in the database. If the tenant does not exist in + the database, it will be added.""" + + self.tenants[tenant.name] = tenant + + def remove_tenant(self, name: str) -> None: + """Remove a tenant from the database.""" + + self.tenants.pop(name, None) + + def get_tenants(self) -> List["TenantConfig"]: + """Retrieve a list of tenants from the database.""" + + return list(self.tenants.values()) + + def get_tenant(self, name: str) -> "TenantConfig": + """Retrieve a tenant from the database by name.""" + + return self.tenants.get(name) + + +@dataclass +class ClusterDatabase: + """Database for storing cluster configurations. Clusters are stored in a + dictionary with the cluster's name as the key and the cluster configuration + object as the value.""" + + clusters: Dict[str, "ClusterConfig"] + + def __init__(self) -> None: + self.clusters = {} + + def add_cluster(self, cluster: "ClusterConfig") -> None: + """Add the cluster to the database.""" + + self.clusters[cluster.name] = cluster + + def remove_cluster(self, name: str) -> None: + """Remove a cluster from the database.""" + + self.clusters.pop(name, None) + + def get_clusters(self) -> List["ClusterConfig"]: + """Retrieve a list of clusters from the database.""" + + return list(self.clusters.values()) + + def get_cluster(self, name: str) -> "ClusterConfig": + """Retrieve a cluster from the database by name.""" + + return self.clusters.get(name) + + +# Create the database instances. + +client_database = ClientDatabase() +tenant_database = TenantDatabase() +cluster_database = ClusterDatabase() diff --git a/lookup-service/service/caches/environments.py b/lookup-service/service/caches/environments.py new file mode 100644 index 000000000..95f471eb1 --- /dev/null +++ b/lookup-service/service/caches/environments.py @@ -0,0 +1,126 @@ +"""Configuration for workshop environments.""" + +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, Dict, List + +from aiohttp import ClientSession +from wrapt import synchronized + +if TYPE_CHECKING: + from .portals import TrainingPortal + from .sessions import WorkshopSession + +logger = logging.getLogger("educates") + + +@dataclass +class WorkshopEnvironment: + """Snapshot of workshop environment state. This includes a database of + the workshop sessions created from the workshop environment.""" + + portal: "TrainingPortal" + name: str + generation: int + workshop: str + title: str + description: str + labels: Dict[str, str] + capacity: int + reserved: int + allocated: int + available: int + phase: str + sessions: Dict[str, "WorkshopSession"] + + def __init__( + self, + portal: "TrainingPortal", + name: str, + generation: int, + workshop: str, + title: str, + description: str, + labels: Dict[str, str], + capacity: int, + reserved: int, + allocated: int, + available: int, + phase: str, + ) -> None: + self.portal = portal + self.name = name + self.generation = generation + self.workshop = workshop + self.title = title + self.description = description + self.labels = labels + self.capacity = capacity + self.reserved = reserved + self.allocated = allocated + self.available = available + self.phase = phase + self.sessions = {} + + def get_sessions(self) -> Dict[str, "WorkshopSession"]: + """Returns all workshop sessions.""" + + return list(self.sessions.values()) + + def get_session(self, session_name: str) -> "WorkshopSession": + """Returns a workshop session by name.""" + + return self.sessions.get(session_name) + + def add_session(self, session: "WorkshopSession") -> None: + """Add a session to the environment.""" + + self.sessions[session.name] = session + + def remove_session(self, session_name: str) -> None: + """Remove a session from the environment.""" + + self.sessions.pop(session_name, None) + + @synchronized + def recalculate_capacity(self) -> None: + """Recalculate the available capacity of the environment.""" + + allocated = 0 + available = 0 + + for session in list(self.sessions.values()): + if session.phase == "Allocated": + allocated += 1 + elif session.phase == "Available": + available += 1 + + self.allocated = allocated + self.available = available + + logger.info( + "Recalculated capacity for environment %s of portal %s in cluster %s: %s", + self.name, + self.portal.name, + self.portal.cluster.name, + {"allocated": allocated, "available": available}, + ) + + async def request_workshop_session( + self, user_id: str, parameters: List[Dict[str, str]], index_url: str + ) -> Dict[str, str] | None: + """Request a workshop session for a user.""" + + portal = self.portal + + async with ClientSession() as http_client: + async with portal.client_session(http_client) as portal_client: + if not portal_client.connected: + return + + return await portal_client.request_workshop_session( + environment_name=self.name, + user_id=user_id, + parameters=parameters, + index_url=index_url, + ) diff --git a/lookup-service/service/caches/portals.py b/lookup-service/service/caches/portals.py new file mode 100644 index 000000000..6ee0f1571 --- /dev/null +++ b/lookup-service/service/caches/portals.py @@ -0,0 +1,346 @@ +"""Configuration database for training portals.""" + +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union + +from aiohttp import BasicAuth, ClientSession + +from .clusters import ClusterConfig + +if TYPE_CHECKING: + from .environments import WorkshopEnvironment + from .sessions import WorkshopSession + + +logger = logging.getLogger("educates") + + +@dataclass +class PortalCredentials: + """Configuration object for a portal's authentication.""" + + client_id: str + client_secret: str + username: str + password: str + + +@dataclass +class TrainingPortal: + """Snapshot of training portal state. This includes a database of the + workshop environments managed by the training portal.""" + + cluster: ClusterConfig + name: str + uid: str + generation: int + labels: Dict[Tuple[str, str], str] + url: str + credentials: PortalCredentials + phase: str + capacity: int + allocated: int + environments: Dict[str, "WorkshopEnvironment"] + + def __init__( + self, + cluster: ClusterConfig, + name: str, + uid: str, + generation: int, + labels: Dict[str, str], + url: str, + credentials: PortalCredentials, + phase: str, + capacity: int, + allocated: int, + ) -> None: + self.cluster = cluster + self.name = name + self.uid = uid + self.generation = generation + self.labels = labels + self.url = url + self.credentials = credentials + self.phase = phase + self.capacity = capacity + self.allocated = allocated + self.environments = {} + + def get_environments(self) -> List["WorkshopEnvironment"]: + """Returns all workshop environments.""" + + return list(self.environments.values()) + + def get_running_environments(self) -> List["WorkshopEnvironment"]: + """Returns all running workshop environments.""" + + return [ + environment + for environment in self.environments.values() + if environment.phase == "Running" + ] + + def get_environment(self, environment_name: str) -> "WorkshopEnvironment": + """Returns a workshop environment by name.""" + + return self.environments.get(environment_name) + + def add_environment(self, environment: "WorkshopEnvironment") -> None: + """Add a workshop environment to the portal.""" + + self.environments[environment.name] = environment + + def remove_environment(self, environment_name: str) -> None: + """Remove a workshop environment from the portal.""" + + self.environments.pop(environment_name, None) + + def hosts_workshop(self, workshop_name: str) -> bool: + """Check if the portal hosts a workshop.""" + + for environment in self.environments.values(): + if environment.workshop == workshop_name: + return True + + return False + + def recalculate_capacity(self) -> None: + """Recalculate the capacity of the portal.""" + + for environment in self.environments.values(): + environment.recalculate_capacity() + + self.allocated = sum( + environment.allocated for environment in self.environments.values() + ) + + logger.info( + "Recalculated capacity for portal %s in cluster %s: %s", + self.name, + self.cluster.name, + {"allocated": self.allocated, "capacity": self.capacity}, + ) + + def find_existing_workshop_session_for_user( + self, user_id: str, workshop_name: str + ) -> Union["WorkshopSession", None]: + """Find an existing workshop session for a user.""" + + for environment in self.environments.values(): + for session in environment.get_sessions(): + if ( + session.user == user_id + and session.environment.workshop == workshop_name + ): + return session + + return None + + def client_session(self, session: ClientSession) -> "TrainingPortalClientSession": + """Create a HTTP client session for accessing the remote training + portal.""" + + return TrainingPortalClientSession(self, session) + + +@dataclass +class TrainingPortalClientSession: + """HTTP client session for accessing the remote training portal.""" + + portal: TrainingPortal + session: ClientSession + access_token: str | None + + def __init__(self, portal: TrainingPortal, session: ClientSession) -> None: + self.portal = portal + self.session = session + self.access_token = None + + async def __aenter__(self) -> "TrainingPortalClientSession": + """Login to the portal service.""" + + await self.login() + + return self + + async def __aexit__(self, exc_type, exc_value, traceback) -> None: + """Logout from the portal service.""" + + await self.logout() + + @property + def connected(self): + """Check if the client session is connected.""" + + return bool(self.access_token) + + async def login(self) -> bool: + """Login to the portal service .""" + + async with self.session.post( + f"{self.portal.url}/oauth2/token/", + data={ + "grant_type": "password", + "username": self.portal.credentials.username, + "password": self.portal.credentials.password, + }, + auth=BasicAuth( + self.portal.credentials.client_id, self.portal.credentials.client_secret + ), + ) as response: + if response.status != 200: + logger.error( + "Failed to login to portal %s of cluster %s.", + self.portal.name, + self.portal.cluster.name, + ) + + return False + + data = await response.json() + + self.access_token = data.get("access_token") + + return True + + async def logout(self) -> None: + """Logout from the portal service.""" + + if not self.connected: + return + + async with self.session.post( + f"{self.portal.url}/oauth2/revoke-token/", + data={ + "client_id": self.portal.credentials.client_id, + "client_secret": self.portal.credentials.client_secret, + "token": self.access_token, + }, + ) as response: + if response.status != 200: + logger.error( + "Failed to logout from portal %s of cluster %s.", + self.portal.name, + self.portal.cluster.name, + ) + + async def user_sessions(self, user_id: str) -> List[Dict[str, Any]]: + """Fetches the list of active sessions for a user.""" + + if not self.connected: + return {} + + headers = {"Authorization": f"Bearer {self.access_token}"} + + async with self.session.get( + f"{self.portal.url}/workshops/user/{user_id}/sessions/", + headers=headers, + ) as response: + if response.status != 200: + logger.error( + "Failed to get sessions from portal %s of cluster %s for user %s.", + self.portal.name, + self.portal.cluster.name, + user_id, + ) + + return {} + + return await response.json() + + async def reacquire_workshop_session( + self, user_id: str, environment_name: str, session_name: str, index_url: str + ) -> Dict[str, str] | None: + """Reacquire a workshop session for a user.""" + + if not self.connected: + return + + if not session_name: + return + + headers = {"Authorization": f"Bearer {self.access_token}"} + + async with self.session.get( + f"{self.portal.url}/workshops/environment/{environment_name}/request/", + headers=headers, + params={ + "index_url": index_url, + "user": user_id, + "session": session_name, + }, + ) as response: + if response.status != 200: + logger.error( + "Failed to reacquire session %s from portal %s of cluster %s for user %s.", + session_name, + self.portal.name, + self.portal.cluster.name, + user_id, + ) + + return + + data = await response.json() + + url = data.get("url") + + if url: + return { + "clusterName": self.portal.cluster.name, + "portalName": self.portal.name, + "environmentName": environment_name, + "sessionName": session_name, + "clientUserId": user_id, + "sessionActionvationUrl": f"{self.portal.url}{url}", + } + + async def request_workshop_session( + self, + environment_name: str, + user_id: str, + parameters: Dict[Tuple[str, str], str], + index_url: str, + ) -> Dict[str, str] | None: + """Request a workshop session for a user.""" + + if not self.connected: + return + + headers = {"Authorization": f"Bearer {self.access_token}"} + + async with self.session.get( + f"{self.portal.url}/workshops/environment/{environment_name}/request/", + headers=headers, + params={ + "user": user_id, + "parameters": parameters, + "index_url": index_url, + }, + ) as response: + if response.status != 200: + logger.error( + "Failed to request session from portal %s of cluster %s for user %s.", + self.portal.name, + self.portal.cluster.name, + user_id, + ) + + return + + data = await response.json() + + url = data.get("url") + session_name = data.get("name") + + if url: + return { + "clusterName": self.portal.cluster.name, + "portalName": self.portal.name, + "environmentName": environment_name, + "sessionName": session_name, + "clientUserId": user_id, + "sessionActionvationUrl": f"{self.portal.url}{url}", + } diff --git a/lookup-service/service/caches/sessions.py b/lookup-service/service/caches/sessions.py new file mode 100644 index 000000000..96b2abc89 --- /dev/null +++ b/lookup-service/service/caches/sessions.py @@ -0,0 +1,37 @@ +"""Model objects for workshop sessions.""" + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Dict + +from aiohttp import ClientSession + +if TYPE_CHECKING: + from .environments import WorkshopEnvironment + + +@dataclass +class WorkshopSession: + """Snapshot of workshop session state.""" + + environment: "WorkshopEnvironment" + name: str + generation: int + phase: str + user: str + + async def reacquire_workshop_session(self, index_url: str) -> Dict[str, str] | None: + """Reacquire a workshop session for a user.""" + + portal = self.environment.portal + + async with ClientSession() as http_client: + async with portal.client_session(http_client) as portal_client: + if not portal_client.connected: + return + + return await portal_client.reacquire_workshop_session( + self.user, + environment_name=self.environment.name, + session_name=self.name, + index_url=index_url, + ) diff --git a/lookup-service/service/caches/tenants.py b/lookup-service/service/caches/tenants.py new file mode 100644 index 000000000..8bdbd7d42 --- /dev/null +++ b/lookup-service/service/caches/tenants.py @@ -0,0 +1,71 @@ +"""Configuration database for training plaform tenants.""" + +from dataclasses import dataclass +from typing import Any, Dict, List + +from ..helpers.selectors import ResourceSelector +from .clusters import ClusterConfig +from .databases import cluster_database +from .portals import TrainingPortal + + +@dataclass +class TenantConfig: + """Configuration object for a tenant of the training platform.""" + + name: str + clusters: ResourceSelector + portals: ResourceSelector + + def __init__(self, name: str, clusters: Dict[str, Any], portals: Dict[str, Any]): + self.name = name + self.clusters = ResourceSelector(clusters) + self.portals = ResourceSelector(portals) + + def allowed_access_to_cluster(self, cluster: ClusterConfig) -> bool: + """Check if the tenant has access to the cluster.""" + + # Fake up a resource metadata object for the cluster. + + resource = { + "metadata": { + "name": cluster.name, + "uid": cluster.uid, + "labels": cluster.labels, + }, + } + + return self.clusters.match_resource(resource) + + def allowed_access_to_portal(self, portal: TrainingPortal) -> bool: + """Check if the tenant has access to the portal.""" + + # Fake up a resource metadata object for the portal. + + resource = { + "metadata": { + "name": portal.name, + "labels": portal.labels, + }, + } + + return self.portals.match_resource(resource) + + def portals_which_are_accessible(self) -> List[TrainingPortal]: + """Retrieve a list of training portals accessible by a tenant.""" + + # Get the list of clusters and portals that match the tenant's rules. + # To do this we iterate over all the portals and for each portal we then + # check the cluster it belongs to against the tenant's cluster rules. + # If the portal's cluster matches the tenant's cluster rules, we then + # check the portal itself against the tenant's portal rules. + + accessible_portals = [] + + for cluster in cluster_database.get_clusters(): + if self.allowed_access_to_cluster(cluster): + for portal in cluster.get_portals(): + if self.allowed_access_to_portal(portal): + accessible_portals.append(portal) + + return accessible_portals diff --git a/lookup-service/service/config.py b/lookup-service/service/config.py new file mode 100644 index 000000000..4b906e43b --- /dev/null +++ b/lookup-service/service/config.py @@ -0,0 +1,36 @@ +"""Configuration for the lookup service.""" + +import functools +import random + + +@functools.lru_cache(maxsize=1) +def jwt_token_secret() -> str: + """Return the application secret key used to sign the JWT tokens. If we are + running inside a Kubernetes cluster, we use the in-cluster Kubernetes access + token as the secret key. Otherwise, we generate a random secret key. The + result is cached to avoid regenerating the secret key for each request. This + means that for randomly generated keys, the key will be the same for the + life of the process. In the case of running in a Kubernetes cluster, the + secret key will be the same for the life of the container the process runs + in, with subsequent instances of the container using the same secret key, + so long as the Kubernetes access token doesn't rotated. When the pod is + restarted after the Kubernetes access token has rotated, a new secret key + will be generated and clients will need to login again. + """ + + # Check if we are running inside a Kubernetes cluster and if we are, use the + # Kubernetes access token as the secret key. + + try: + with open( + "/var/run/secrets/kubernetes.io/serviceaccount/token", encoding="utf-8" + ) as f: + return f.read() + + except FileNotFoundError: + # Generate a random secret key using random.choice() to select from a + # string of characters. + + characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + return "".join(random.choice(characters) for _ in range(64)) diff --git a/lookup-service/service/handlers/__init__.py b/lookup-service/service/handlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lookup-service/service/handlers/clients.py b/lookup-service/service/handlers/clients.py new file mode 100644 index 000000000..31bb49867 --- /dev/null +++ b/lookup-service/service/handlers/clients.py @@ -0,0 +1,68 @@ +"""Operator handlers for client configuration resources.""" + +import logging +from typing import Any, Dict + +import kopf + +from ..caches.clients import ClientConfig +from ..helpers.objects import xgetattr +from ..service import ServiceState + +logger = logging.getLogger("educates") + + +@kopf.on.resume("clientconfigs.lookup.educates.dev") +@kopf.on.create("clientconfigs.lookup.educates.dev") +@kopf.on.update("clientconfigs.lookup.educates.dev") +def clientconfigs_update( + name: str, meta: kopf.Meta, spec: kopf.Spec, memo: ServiceState, reason: str, **_ +) -> Dict[str, Any]: + """Add the client configuration to the client database.""" + + generation = meta["generation"] + + client_name = name + + client_uid = xgetattr(meta, "uid") + client_password = xgetattr(spec, "client.password") + client_tenants = xgetattr(spec, "tenants", []) + client_roles = xgetattr(spec, "roles", []) + + logger.info( + "%s client configuration %r with generation %s.", + (reason == "update") and "Update" or "Register", + name, + generation, + ) + + client_database = memo.client_database + + client_database.update_client( + ClientConfig( + name=client_name, + uid=client_uid, + password=client_password, + tenants=client_tenants, + roles=client_roles, + ) + ) + + return {} + + +@kopf.on.delete("clientconfigs.lookup.educates.dev") +def clientconfigs_delete(name: str, meta: kopf.Meta, memo: ServiceState, **_) -> None: + """Remove the client configuration from the client database.""" + + generation = meta["generation"] + + client_database = memo.client_database + + client_name = name + + logger.info( + "Discard client configuration %r with generation %s.", client_name, generation + ) + + client_database.remove_client(client_name) diff --git a/lookup-service/service/handlers/clusters.py b/lookup-service/service/handlers/clusters.py new file mode 100644 index 000000000..b5c2cdf03 --- /dev/null +++ b/lookup-service/service/handlers/clusters.py @@ -0,0 +1,568 @@ +"""Operator handlers for cluster configuration resources.""" + +import asyncio +import base64 +import logging +from typing import Any, Dict + +import kopf +import yaml +from wrapt import synchronized + +from ..caches.clusters import ClusterConfig +from ..caches.environments import WorkshopEnvironment +from ..caches.portals import PortalCredentials, TrainingPortal +from ..caches.sessions import WorkshopSession +from ..helpers.kubeconfig import ( + create_kubeconfig_from_access_token_secret, + extract_context_from_kubeconfig, + verify_kubeconfig_format, +) +from ..helpers.objects import xgetattr +from ..helpers.operator import GenericOperator +from ..service import ServiceState + +logger = logging.getLogger("educates") + + +@kopf.index("secrets", when=lambda body, **_: body.get("type") == "Opaque") +def secrets_index(namespace: str, name: str, body: kopf.Body, **_) -> dict: + """Keeps an index of secret data by namespace and name. This is so we can + easily retrieve the kubeconfig data for each cluster when starting the + training platform operator.""" + + # Note that under normal circumstances only a single namespace should be + # monitored, thus we are not caching secrets from the whole cluster but + # only where the operator is deployed. This is to avoid potential security + # issues and memory bloat from caching secrets from the whole cluster. + + return {(namespace, name): xgetattr(body, "data", {})} + + +@kopf.on.resume("clusterconfigs.lookup.educates.dev") +@kopf.on.create("clusterconfigs.lookup.educates.dev") +@kopf.on.update("clusterconfigs.lookup.educates.dev") +def clusterconfigs_update( + namespace: str, + name: str, + uid: str, + meta: kopf.Meta, + spec: kopf.Spec, + secrets_index: Dict[str, Any], + memo: ServiceState, + retry: int, + **_, +): # pylint: disable=redefined-outer-name + """Add the cluster configuration to the cluster database.""" + + generation = meta.get("generation") + + # We need to cache the kubeconfig data. This can be provided in a separate + # secret or it can be read from a mounted secret for the case of the local + # cluster. + + secret_ref_name = xgetattr(spec, "credentials.kubeconfig.secretRef.name") + + if secret_ref_name is not None: + config_key = xgetattr(spec, "credentials.kubeconfig.secretRef.key", "config") + + # Make sure the secret holding the kubeconfig has been seen already and + # that the key for the kubeconfig file is present in the data. + + if (namespace, secret_ref_name) not in secrets_index: + raise kopf.TemporaryError( + f"Secret {secret_ref_name} required for cluster configuration {name} not found.", + delay=5, + ) + + cluster_config_data, *_ = secrets_index[(namespace, secret_ref_name)] + + if config_key not in cluster_config_data: + raise kopf.TemporaryError( + f"Key {config_key} not found in secret {secret_ref_name} required for cluster configuration {name}.", # pylint: disable=line-too-long + delay=5 if not retry else 15, + ) + + # Decode the base64 encoded kubeconfig data and load it as a yaml + # document. + + try: + kubeconfig = yaml.safe_load( + base64.b64decode( + xgetattr(cluster_config_data, config_key, "").encode("UTF-8") + ) + ) + except yaml.YAMLError as exc: + raise kopf.TemporaryError( + f"Failed to load kubeconfig data from secret {secret_ref_name} required for cluster configuration {name}.", # pylint: disable=line-too-long + delay=5 if not retry else 15, + ) from exc + + try: + verify_kubeconfig_format(kubeconfig) + except ValueError as exc: + raise kopf.TemporaryError( + f"Invalid kubeconfig data in secret {secret_ref_name} required for cluster configuration {name}.", # pylint: disable=line-too-long + delay=5 if not retry else 15, + ) from exc + + # Extract only the context from the kubeconfig file that is required + # for the cluster configuration. + + try: + kubeconfig = extract_context_from_kubeconfig( + kubeconfig, xgetattr(spec, "credentials.kubeconfig.context") + ) + except ValueError as exc: + raise kopf.TemporaryError( + f"Failed to extract kubeconfig context from secret {secret_ref_name} required for cluster configuration {name}.", # pylint: disable=line-too-long + delay=5 if not retry else 15, + ) from exc + + else: + # For the local cluster, we access credentials for accessing the cluster + # from a mounted Kubernetes access token secret. Note that we do not + # know the external URL of the local cluster, so we use the internal + # Kubernetes service URL. This will need to be replaced if the + # kubeconfig is used for accessing the cluster from outside the cluster. + + server = "https://kubernetes.default.svc" + + # TODO: Make the path to the access token secret configurable. + + kubeconfig = create_kubeconfig_from_access_token_secret( + "/opt/cluster-access-token", name, server + ) + + # Update the cluster configuration in the cluster database. + + cluster_database = memo.cluster_database + + with synchronized(cluster_database): + cluster_config = cluster_database.get_cluster(name) + + if not cluster_config: + logger.info( + "Registering cluster configuration %r with generation %s.", + name, + generation, + ) + + cluster_database.add_cluster( + ClusterConfig( + name=name, + uid=uid, + labels=xgetattr(spec, "labels", {}), + kubeconfig=kubeconfig, + ) + ) + + else: + logger.info( + "Updating cluster configuration %r with generation %s.", + name, + generation, + ) + + cluster_config.labels = xgetattr(spec, "labels", {}) + cluster_config.kubeconfig = kubeconfig + + +@kopf.on.delete("clusterconfigs.lookup.educates.dev") +def clusterconfigs_delete(name: str, memo: ServiceState, **_): + """Remove the cluster configuration from the cluster database.""" + + generation = memo.get("generation") + + cluster_database = memo.cluster_database + + logger.info("Delete cluster configuration %r with generation %s", name, generation) + + cluster_database.remove_cluster(name) + + +class ClusterOperator(GenericOperator): + """Operator for interacting with training platform on separate cluster.""" + + def __init__(self, cluster_name: str, service_state: ServiceState) -> None: + """Initializes the operator.""" + + super().__init__(cluster_name, service_state=service_state) + + def register_handlers(self) -> None: + """Register the handlers for the training platform operator.""" + + @kopf.on.event( + "trainingportals.training.educates.dev", + registry=self.operator_registry, + ) + async def trainingportals_event(event: kopf.RawEvent, **_): + """Handles events for training portals.""" + + body = xgetattr(event, "object", {}) + metadata = xgetattr(body, "metadata", {}) + spec = xgetattr(body, "spec", {}) + status = xgetattr(body, "status", {}) + + portal_name = xgetattr(metadata, "name") + + if xgetattr(event, "type") == "DELETED": + logger.info( + "Discard training portal %s of cluster %s", + portal_name, + self.cluster_name, + ) + + self.cluster_config.remove_portal(portal_name) + + else: + credentials = PortalCredentials( + client_id=xgetattr(status, "educates.clients.robot.id"), + client_secret=xgetattr(status, "educates.clients.robot.secret"), + username=xgetattr(status, "educates.credentials.robot.username"), + password=xgetattr(status, "educates.credentials.robot.password"), + ) + + with synchronized(self.cluster_config): + portal_state = self.cluster_config.get_portal(portal_name) + + if not portal_state: + logger.info( + "Registering training portal %s of cluster %s", + portal_name, + self.cluster_name, + ) + + self.cluster_config.add_portal( + TrainingPortal( + cluster=self.cluster_config, + name=portal_name, + uid=xgetattr(metadata, "uid"), + generation=xgetattr(metadata, "generation"), + labels=xgetattr(spec, "portal.labels", {}), + url=xgetattr(status, "educates.url"), + phase=xgetattr(status, "educates.phase"), + credentials=credentials, + capacity=xgetattr(spec, "portal.sessions.maximum", 0), + allocated=0, + ) + ) + + portal_state = self.cluster_config.get_portal(portal_name) + + else: + logger.info( + "Updating training portal %s of cluster %s", + portal_name, + self.cluster_name, + ) + + portal_state.generation = xgetattr(metadata, "generation") + portal_state.labels = xgetattr(spec, "portal.labels", {}) + portal_state.url = xgetattr(status, "educates.url") + portal_state.phase = xgetattr(status, "educates.phase") + portal_state.credentials = credentials + portal_state.capacity = xgetattr( + spec, "portal.sessions.maximum", 0 + ) + + with synchronized(portal_state): + portal_state.recalculate_capacity() + + @kopf.on.event( + "workshopenvironments.training.educates.dev", + labels={"training.educates.dev/portal.name": kopf.PRESENT}, + registry=self.operator_registry, + ) + async def workshopenvironments_event(event: kopf.RawEvent, **_): + """Handles events for workshop environments.""" + + body = xgetattr(event, "object", {}) + metadata = xgetattr(body, "metadata", {}) + spec = xgetattr(body, "spec", {}) + status = xgetattr(body, "status", {}) + + portal_name = xgetattr(metadata, "labels", {}).get( + "training.educates.dev/portal.name" + ) + environment_name = xgetattr(metadata, "name") + workshop_name = xgetattr(spec, "workshop.name") + + workshop_generation = xgetattr(status, "educates.workshop.generation", 0) + workshop_spec = xgetattr(status, "educates.workshop.spec", {}) + + portal = self.cluster_config.get_portal(portal_name) + + if xgetattr(event, "type") == "DELETED": + if portal: + with synchronized(portal): + logger.info( + "Discard workshop environment %s for workshop %s from portal %s of cluster %s", # pylint: disable=line-too-long + environment_name, + workshop_name, + portal_name, + self.cluster_name, + ) + + portal.remove_environment(environment_name) + portal.recalculate_capacity() + + else: + logger.info( + "Discard workshop environment %s for workshop %s from portal %s of cluster %s as portal not found", # pylint: disable=line-too-long + environment_name, + workshop_name, + portal_name, + self.cluster_name, + ) + + else: + while not portal: + logger.warning( + "Portal %s not found for workshop environment %s of cluster %s, sleeping...", # pylint: disable=line-too-long + portal_name, + environment_name, + self.cluster_name, + ) + + # TODO How should we fail this if the portal is not found + # after a certain number of retries? Will continually + # retrying hold up ability to handle other events for the + # same resource type? + + await asyncio.sleep(2.0) + + portal = self.cluster_config.get_portal(portal_name) + + with synchronized(portal): + environment_state = portal.get_environment(environment_name) + + if not environment_state: + logger.info( + "Registering workshop environment %s for workshop %s from portal %s of cluster %s", # pylint: disable=line-too-long + environment_name, + workshop_name, + portal_name, + self.cluster_name, + ) + + portal.add_environment( + WorkshopEnvironment( + portal=portal, + name=environment_name, + generation=workshop_generation, + workshop=workshop_name, + title=xgetattr(workshop_spec, "title"), + description=xgetattr(workshop_spec, "description"), + labels=xgetattr(workshop_spec, "labels", {}), + capacity=xgetattr(status, "educates.capacity", 0), + reserved=xgetattr(status, "educates.reserved", 0), + allocated=0, + available=0, + phase=xgetattr(status, "educates.phase"), + ) + ) + + else: + logger.info( + "Updating workshop environment %s for workshop %s from portal %s of cluster %s", # pylint: disable=line-too-long + environment_name, + workshop_name, + portal_name, + self.cluster_name, + ) + + environment_state.generation = workshop_generation + environment_state.title = xgetattr(workshop_spec, "title") + environment_state.description = xgetattr( + workshop_spec, "description" + ) + environment_state.labels = xgetattr(workshop_spec, "labels", {}) + + environment_state.phase = xgetattr(status, "educates.phase") + + environment_state.capacity = xgetattr( + status, "educates.capacity", 0 + ) + environment_state.reserved = xgetattr( + status, "educates.reserved", 0 + ) + + portal.recalculate_capacity() + + @kopf.on.event( + "workshopsessions.training.educates.dev", + labels={ + "training.educates.dev/portal.name": kopf.PRESENT, + "training.educates.dev/environment.name": kopf.PRESENT, + }, + registry=self.operator_registry, + ) + async def workshopsessions_event(event: kopf.RawEvent, **_): + """Handles events for workshop sessions.""" + + body = xgetattr(event, "object", {}) + metadata = xgetattr(body, "metadata", {}) + status = xgetattr(body, "status", {}) + + portal_name = xgetattr(metadata, "labels", {}).get( + "training.educates.dev/portal.name" + ) + environment_name = xgetattr(metadata, "labels", {}).get( + "training.educates.dev/environment.name" + ) + session_name = xgetattr(metadata, "name") + + portal = self.cluster_config.get_portal(portal_name) + + if xgetattr(event, "type") == "DELETED": + if portal: + environment = portal.get_environment(environment_name) + + if environment: + with synchronized(portal): + logger.info( + "Discard workshop session %s for environment %s from portal %s of cluster %s", # pylint: disable=line-too-long + session_name, + environment_name, + portal_name, + self.cluster_name, + ) + + environment.remove_session(session_name) + portal.recalculate_capacity() + + else: + logger.info( + "Discard workshop session %s for environment %s from portal %s of cluster %s as environment not found", # pylint: disable=line-too-long + session_name, + environment_name, + portal_name, + self.cluster_name, + ) + + else: + logger.info( + "Discard workshop session %s for environment %s from portal %s of cluster %s as portal not found", # pylint: disable=line-too-long + session_name, + environment_name, + portal_name, + self.cluster_name, + ) + + else: + portal = self.cluster_config.get_portal(portal_name) + + while not portal: + logger.warning( + "Portal %s not found for workshop session %s of cluster %s, sleeping...", + portal_name, + session_name, + self.cluster_name, + ) + + # TODO How should we fail this if the portal is not found + # after a certain number of retries? Will continually + # retrying hold up ability to handle other events for the + # same resource type? + + await asyncio.sleep(2.0) + + portal = self.cluster_config.get_portal(portal_name) + + environment = portal.get_environment(environment_name) + + while not environment: + logger.warning( + "Environment %s not found for workshop session %s of cluster %s, sleeping...", # pylint: disable=line-too-long + environment_name, + session_name, + self.cluster_name, + ) + + # TODO How should we fail this if the portal is not found + # after a certain number of retries? Will continually + # retrying hold up ability to handle other events for the + # same resource type? + + await asyncio.sleep(2.0) + + environment = portal.get_environment(environment_name) + + with synchronized(portal): + session_state = environment.get_session(session_name) + + if not session_state: + logger.info( + "Registering workshop session %s for environment %s from portal %s of cluster %s, where user is %r", # pylint: disable=line-too-long + session_name, + environment_name, + portal_name, + self.cluster_name, + xgetattr(status, "educates.user"), + ) + + environment.add_session( + WorkshopSession( + environment=environment, + name=session_name, + generation=xgetattr(metadata, "generation"), + phase=xgetattr(status, "educates.phase"), + user=xgetattr(status, "educates.user"), + ) + ) + + else: + logger.info( + "Updating workshop session %s for environment %s from portal %s of cluster %s, where user is %r", # pylint: disable=line-too-long + session_name, + environment_name, + portal_name, + self.cluster_name, + xgetattr(status, "educates.user"), + ) + + session_state.generation = xgetattr(metadata, "generation") + session_state.phase = xgetattr(status, "educates.phase") + session_state.user = xgetattr(status, "educates.user") + + portal.recalculate_capacity() + + +@kopf.daemon( + "clusterconfigs.lookup.educates.dev", + cancellation_backoff=5.0, + cancellation_polling=5.0, +) +def clusterconfigs_daemon( + stopped: kopf.DaemonStopped, + name: str, + uid: str, + retry: int, + memo: ServiceState, + **_, +) -> None: + """Starts an instance of the cluster operator for each registered cluster + and waits for it to complete.""" + + # Make sure we have separately processed the cluster config resource so + # that an item exists for it in the cache and it has the same uid. + + cache = memo.cluster_database + + cluster_config = cache.get_cluster(name) + + if not cluster_config or cluster_config.uid != uid: + raise kopf.TemporaryError( + f"Cluster {name} with uid {uid} not found.", + delay=5 if not retry else 15, + ) + + # Start the cluster operator and wait for it to complete. An infinite loop + # is used to keep the daemon thread running until the daemon is stopped as + # kopf framework expects this daemon thread to be running indefinitely until + # it is stopped. + + operator = ClusterOperator(cluster_config, memo) + + operator.run_until_stopped(stopped) diff --git a/lookup-service/service/handlers/tenants.py b/lookup-service/service/handlers/tenants.py new file mode 100644 index 000000000..39e735036 --- /dev/null +++ b/lookup-service/service/handlers/tenants.py @@ -0,0 +1,64 @@ +"""Operator handlers for tenant configuration resources.""" + +import logging +from typing import Any, Dict + +import kopf + +from ..caches.tenants import TenantConfig +from ..helpers.objects import xgetattr +from ..service import ServiceState + +logger = logging.getLogger("educates") + + +@kopf.on.resume("tenantconfigs.lookup.educates.dev") +@kopf.on.create("tenantconfigs.lookup.educates.dev") +@kopf.on.update("tenantconfigs.lookup.educates.dev") +def tenantconfigs_update( + name: str, meta: kopf.Meta, spec: kopf.Spec, memo: ServiceState, reason: str, **_ +) -> Dict[str, Any]: + """Add the tenant configuration to the tenant database.""" + + generation = meta["generation"] + + tenant_name = name + + tenant_clusters = xgetattr(spec, "clusters", {}) + tenant_portals = xgetattr(spec, "portals", {}) + + logger.info( + "%s tenant configuration %r with generation %s.", + (reason == "update") and "Update" or "Register", + name, + generation, + ) + + tenant_database = memo.tenant_database + + tenant_database.update_tenant( + TenantConfig( + name=tenant_name, + clusters=tenant_clusters, + portals=tenant_portals, + ) + ) + + return {} + + +@kopf.on.delete("tenantconfigs.lookup.educates.dev") +def tenantconfigs_delete(name: str, meta: kopf.Meta, memo: ServiceState, **_) -> None: + """Remove the tenant configuration from the tenant database.""" + + generation = meta["generation"] + + tenant_database = memo.tenant_database + + tenant_name = name + + logger.info( + "Discard tenant configuration %r with generation %s.", tenant_name, generation + ) + + tenant_database.remove_tenant(tenant_name) diff --git a/lookup-service/service/helpers/__init__.py b/lookup-service/service/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lookup-service/service/helpers/kubeconfig.py b/lookup-service/service/helpers/kubeconfig.py new file mode 100644 index 000000000..95316292f --- /dev/null +++ b/lookup-service/service/helpers/kubeconfig.py @@ -0,0 +1,240 @@ +"""Helper functions for working with kubeconfig files.""" + +import base64 +from typing import Union + +import kopf + +# The kubeconfig file is a YAML file with the following structure: +# +# apiVersion: v1 +# kind: Config +# clusters: +# - name: cluster-name +# cluster: +# server: https://kubernetes.default.svc +# certificate-authority-data: +# contexts: +# - name: cluster-name-context +# context: +# cluster: cluster-name +# user: cluster-name-user +# current-context: cluster-name-context +# users: +# - name: cluster-name-user +# user: +# token: + + +def create_kubeconfig_from_access_token_secret( + directory: str, + cluster_name: str, + server_url: str = "https://kubernetes.default.svc", +) -> dict: + """Creates a kubeconfig from mounted access token secret.""" + + # The mounted directory is a volume created from the Kubernetes service + # account token and CA certificate. We want to create a kubeconfig file that + # uses these to access the Kubernetes API. First read the service account + # token from the mounted directory. + + with open(f"{directory}/token", "r", encoding="utf-8") as token_file: + token = token_file.read().strip() + + # Read the CA certificate from the mounted directory. + + with open(f"{directory}/ca.crt", "rb") as ca_file: + ca_certificate_bytes = ca_file.read().strip() + + # Create the kubeconfig file. + + kubeconfig = { + "apiVersion": "v1", + "kind": "Config", + "clusters": [ + { + "name": cluster_name, + "cluster": { + "server": server_url, + "certificate-authority-data": base64.b64encode( + ca_certificate_bytes + ).decode("utf-8"), + }, + } + ], + "contexts": [ + { + "name": f"{cluster_name}-context", + "context": { + "cluster": cluster_name, + "user": f"{cluster_name}-user", + }, + } + ], + "current-context": f"{cluster_name}-context", + "users": [ + { + "name": f"{cluster_name}-user", + "user": { + "token": token, + }, + } + ], + } + + return kubeconfig + + +def verify_kubeconfig_format(kubeconfig: dict) -> None: + """Verifies that a kubeconfig file is well-formed.""" + + # Verify the kubeconfig file has the correct structure. + + if ( + kubeconfig.get("apiVersion") != "v1" + or kubeconfig.get("kind") != "Config" + or not isinstance(kubeconfig.get("clusters"), list) + or not isinstance(kubeconfig.get("contexts"), list) + or not isinstance(kubeconfig.get("users"), list) + or not isinstance(kubeconfig.get("current-context"), str) + ): + raise ValueError("Invalid kubeconfig file format.") + + for cluster in kubeconfig.get("clusters", []): + if ( + not isinstance(cluster, dict) + or not isinstance(cluster.get("name"), str) + or not isinstance(cluster.get("cluster"), dict) + or not isinstance(cluster["cluster"].get("server"), str) + or not isinstance(cluster["cluster"].get("certificate-authority-data"), str) + ): + raise ValueError("Invalid kubeconfig file format.") + + for context in kubeconfig.get("contexts", []): + if ( + not isinstance(context, dict) + or not isinstance(context.get("name"), str) + or not isinstance(context.get("context"), dict) + or not isinstance(context["context"].get("cluster"), str) + or not isinstance(context["context"].get("user"), str) + ): + raise ValueError("Invalid kubeconfig file format.") + + for user in kubeconfig.get("users", []): + if ( + not isinstance(user, dict) + or not isinstance(user.get("name"), str) + or not isinstance(user.get("user"), dict) + or not isinstance(user["user"].get("token"), str) + ): + raise ValueError("Invalid kubeconfig file format.") + + +def extract_context_from_kubeconfig( + kubeconfig: dict, context: Union[str, None] = None +) -> dict: + """Extracts a context from a kubeconfig file. If the context is not + specified, the current context is extracted, or if no current context then + use the first context found. Leave the certficate data in its base64 encoded + form. Assume that the kubeconfig file is well-formed, does not need + validation and the context exists. Also assume that it only provides + certificate authority data and a token for authentication and that it does + not use a client certificate.""" + + # If no context provided see if the current context is specified in the + # kubeconfig file data, otherwise use the first context found. + + if context is None: + context = kubeconfig.get("current-context") + + if context is None: + context = kubeconfig["contexts"][0]["name"] + + # Find the context in the kubeconfig file data. + + context_data = None + + for context_data in kubeconfig["contexts"]: + if context_data["name"] == context: + break + + if context_data is None: + raise ValueError(f"Context {context} not found in kubeconfig file.") + + # Find the cluster and user data for the context. + + cluster_data = None + + for cluster in kubeconfig["clusters"]: + if cluster["name"] == context_data["context"]["cluster"]: + cluster_data = cluster + break + + user_data = None + + for user in kubeconfig["users"]: + if user["name"] == context_data["context"]["user"]: + user_data = user + break + + # Construct a new kubeconfig file with only data releveant to the context. + + kubeconfig = { + "apiVersion": "v1", + "kind": "Config", + "clusters": [cluster_data], + "contexts": [context_data], + "current-context": context_data["name"], + "users": [user_data], + } + + return kubeconfig + + +def create_connection_info_from_kubeconfig(config: dict) -> kopf.ConnectionInfo: + """Create kopf connection info from kubeconfig data.""" + + contexts = {} + clusters = {} + users = {} + + current_context = None + + if current_context is None: + current_context = config.get("current-context") + + for item in config.get("contexts", []): + if item["name"] not in contexts: + contexts[item["name"]] = item.get("context") or {} + + for item in config.get("clusters", []): + if item["name"] not in clusters: + clusters[item["name"]] = item.get("cluster") or {} + + for item in config.get("users", []): + if item["name"] not in users: + users[item["name"]] = item.get("user") or {} + + if current_context is None: + raise ValueError("Current context is not set in kubeconfig.") + + if current_context not in contexts: + raise ValueError(f"Context {current_context} not found in kubeconfig.") + + context = contexts[current_context] + cluster = clusters[context["cluster"]] + user = users[context["user"]] + + provider_token = user.get("auth-provider", {}).get("config", {}).get("access-token") + + return kopf.ConnectionInfo( + server=cluster.get("server"), + ca_data=cluster.get("certificate-authority-data"), + insecure=cluster.get("insecure-skip-tls-verify"), + certificate_data=user.get("client-certificate-data"), + private_key_data=user.get("client-key-data"), + username=user.get("username"), + password=user.get("password"), + token=user.get("token") or provider_token, + default_namespace=context.get("namespace"), + ) diff --git a/lookup-service/service/helpers/objects.py b/lookup-service/service/helpers/objects.py new file mode 100644 index 000000000..da96dfc07 --- /dev/null +++ b/lookup-service/service/helpers/objects.py @@ -0,0 +1,21 @@ +"""Helper functions for accessing objects.""" + +from typing import Any + + +def xgetattr(obj: Any, key: str, default: Any = None) -> Any: + """Looks up a property within an object using a dotted path as key. + If the property isn't found, then return the default value. + """ + + keys = key.split(".") + value = default + + for key in keys: + value = obj.get(key) + if value is None: + return default + + obj = value + + return value diff --git a/lookup-service/service/helpers/operator.py b/lookup-service/service/helpers/operator.py new file mode 100644 index 000000000..cf4e9762d --- /dev/null +++ b/lookup-service/service/helpers/operator.py @@ -0,0 +1,172 @@ +"""Base class and helper functions for kopf based operator.""" + +import asyncio +import contextlib +import logging +import threading +import time + +import aiohttp +import kopf + +from ..caches.clusters import ClusterConfig +from ..service import ServiceState +from .kubeconfig import create_connection_info_from_kubeconfig + +logger = logging.getLogger("educates") + + +class GenericOperator(threading.Thread): + """Base class for kopf based operator.""" + + def __init__( + self, + cluster_config: ClusterConfig, + *, + namespaces: str = None, + service_state: ServiceState + ) -> None: + """Initializes the operator.""" + + super().__init__() + + # Set the name of the operator and the namespaces to watch for + # resources. When the list of namespaces is empty, the operator will + # watch for resources cluster wide. + + self.cluster_config = cluster_config + self.namespaces = namespaces or [] + + # Set the state object for the operator. This is used to store the state + # of the operator across invocations. + + self.service_state = service_state + + # Create an operator registry to store the handlers for the operator. + # We need a distinct registry for each operator since we need to be able + # to run multiple operators in the same process with separate handlers. + + self.operator_registry = kopf.OperatorRegistry() + + # Create a stop flag to signal the operator to stop running. This is + # used to bridge between the kopf variable for stopping the operator + # and event required to stop the event loop for the operator. + + self._stop_flag = threading.Event() + + @property + def cluster_name(self): + """Return the name of the cluster the operator is managing.""" + + return self.cluster_config.name + + @property + def kubeconfig(self): + """Return the kubeconfig for the cluster the operator is managing.""" + + return self.cluster_config.kubeconfig + + def register_handlers(self) -> None: + """Register the handlers for the operator.""" + + raise NotImplementedError("Subclasses must implement this method.") + + def run(self) -> None: + """Starts the kopf operator in a separate event loop.""" + + # Register the login function for the operator. + + @kopf.on.login(registry=self.operator_registry) + def login_fn(**_) -> dict: + """Returns login credentials for the cluster calculated from the + configuration currently held in the cluster configuration cache.""" + + return create_connection_info_from_kubeconfig(self.kubeconfig) + + @kopf.on.cleanup() + async def cleanup_fn(**_) -> None: + """Cleanup function for operator.""" + + # Workaround for possible kopf bug, set stop flag. + + self._stop_flag.set() + + # Register the kopf handlers for this operator. + + self.register_handlers() + + # Determine if the operator should be run clusterwide or in specific + # namespaces. + + clusterwide = False + + if not self.namespaces: + clusterwide = True + + # Run the operator in a separate event loop, waiting for the stop flag + # to be set, at which point the operator will be stopped and this thread + # will exit. + + while not self._stop_flag.is_set(): + event_loop = asyncio.new_event_loop() + + asyncio.set_event_loop(event_loop) + + logger.info("Starting managed cluster operator for %s.", self.cluster_name) + + with contextlib.closing(event_loop): + try: + event_loop.run_until_complete( + kopf.operator( + registry=self.operator_registry, + clusterwide=clusterwide, + namespaces=self.namespaces, + memo=self.service_state, + stop_flag=self._stop_flag, + ) + ) + + except ( + aiohttp.client_exceptions.ClientConnectorError, + aiohttp.client_exceptions.ClientConnectorCertificateError, + ): + # If the operator exits due to a connection error it means it + # could not connect to the cluster on initial startup. After + # a short delay, the operator will be restarted. Note that + # this only applied to the initial connecttion. If the operator + # loses connection to the cluster while running, it will not + # be restarted and what instead happens is that kopf will + # continually attempt to reconnect to the cluster. + # + # TODO: Need to find a way to get from kopf a notification + # that the watchers are failing so can try to reconnect + # or tale some other action. + + logger.exception( + "Connection error, restarting operator after delay." + ) + + time.sleep(5.0) + + def cancel(self) -> None: + """Flags the kopf operator to stop.""" + + # Set the stop flag to stop the operator. This will cause the event loop + # to stop running and the operator thread to exit. + + self._stop_flag.set() + + def run_until_stopped(self, stopped: kopf.DaemonStopped) -> None: + """Run the operator until stopped.""" + + self.start() + + while not stopped: + # We should be called from a traditional thread so it is safe to use + # blocking sleep call. + + time.sleep(1.0) + + self.cancel() + + self.join() diff --git a/lookup-service/service/helpers/selectors.py b/lookup-service/service/helpers/selectors.py new file mode 100644 index 000000000..29e41f36e --- /dev/null +++ b/lookup-service/service/helpers/selectors.py @@ -0,0 +1,144 @@ +"""Selectors for matching Kubernetes resource objects.""" + +import fnmatch +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List + +from ..helpers.objects import xgetattr + + +@dataclass +class NameSelector: + """Selector for matching Kubernetes resource objects by name.""" + + match_names: List[str] + + def match_resource(self, resource: Dict[str, Any]) -> bool: + """Check if a resource matches the selector. Note that if the list of + names is empty, then the selector will match all resources. When + matching names we actually use a glob expression.""" + + if not self.match_names: + return True + + name = xgetattr(resource, "metadata.name") + + for pattern in self.match_names: + if fnmatch.fnmatch(name, pattern): + return True + + return False + + +class Operator(Enum): + """Operators for when matching Kubernetes resource objects by label + expressions. + """ + + IN = "In" + NOT_IN = "NotIn" + EXISTS = "Exists" + DOES_NOT_EXIST = "DoesNotExist" + + +@dataclass +class LabelSelectorRequirement: + """Selector for matching Kubernetes resource objects by label express.""" + + key: str + operator: Operator + values: List[str] + + def match_resource(self, resource: Dict[str, Any]) -> bool: + """Check if a resource matches the selector.""" + + labels = xgetattr(resource, "metadata.labels", {}) + + value = labels.get(self.key) + + if self.operator == Operator.IN: + return value in self.values + elif self.operator == Operator.NOT_IN: + return value not in self.values + elif self.operator == Operator.EXISTS: + return value is not None + elif self.operator == Operator.DOES_NOT_EXIST: + return value is None + + return False + + +@dataclass +class LabelSelector: + """selector for matching Kubernetes resource objects by label.""" + + match_labels: Dict[str, str] + match_expressions: List[LabelSelectorRequirement] + + def match_resource(self, resource: Dict[str, Any]) -> bool: + """Check if a resource matches the selector.""" + + # First check if labels match by key/value pairs. If the set of labels + # is empty, then the selector will match all resources, but will still + # need to go on and check the label expressions. + + labels = xgetattr(resource, "metadata.labels", {}) + + if not all( + labels.get(key) == value for key, value in self.match_labels.items() + ): + return False + + # Now check list of label expressions. If this list is empty, then it + # will match all resources. + + return all(expr.match_resource(resource) for expr in self.match_expressions) + + +def convert_to_name_selector(name_selector_dict: dict) -> NameSelector: + """Converts a Kubernetes resource representation of a name selector to a + NameSelector object. + """ + + return NameSelector(match_names=name_selector_dict.get("matchNames", [])) + + +def convert_to_label_selector(label_selector_dict: dict) -> LabelSelector: + """Converts a Kubernetes resource representation of a label selector to a + LabelSelector object. + """ + + match_labels = label_selector_dict.get("matchLabels", {}) + + match_expressions_data = label_selector_dict.get("matchExpressions", []) + + match_expressions = [ + LabelSelectorRequirement( + key=expr["key"], operator=expr["operator"], values=expr.get("values") + ) + for expr in match_expressions_data + ] + + return LabelSelector(match_labels=match_labels, match_expressions=match_expressions) + + +@dataclass +class ResourceSelector: + """Selectors for matching Kubernetes resource objects.""" + + name_selector: NameSelector + label_selector: LabelSelector + + def __init__(self, selector: Any) -> None: + self.name_selector = convert_to_name_selector(selector.get("nameSelector", {})) + self.label_selector = convert_to_label_selector( + selector.get("labelSelector", {}) + ) + + def match_resource(self, resource: Dict[str, Any]) -> bool: + """Check if a resource matches the selector.""" + + return self.name_selector.match_resource( + resource + ) and self.label_selector.match_resource(resource) diff --git a/lookup-service/service/main.py b/lookup-service/service/main.py new file mode 100644 index 000000000..625a83487 --- /dev/null +++ b/lookup-service/service/main.py @@ -0,0 +1,251 @@ +"""Main entry point for the lookup service. This module starts the kopf +operator framework and the aiohttp server for handling REST API requests.""" + +import asyncio +import contextlib +import logging +import os +import signal +import threading + +import aiohttp +import kopf +import pykube + +from .caches.databases import client_database, cluster_database, tenant_database +from .handlers import clients as _ # pylint: disable=unused-import +from .handlers import clusters as _ # pylint: disable=unused-import +from .handlers import tenants as _ # pylint: disable=unused-import +from .routes import register_routes +from .service import ServiceState + + +# Set up logging for the educates operator. + +logging.getLogger("kopf.activities.probe").setLevel(logging.WARNING) +logging.getLogger("kopf.objects").setLevel(logging.WARNING) + +logger = logging.getLogger("educates") + + +# Configuration to namespace to monitor for configuration resources. + +OPERATOR_NAMESPACE = os.getenv("OPERATOR_NAMESPACE", "educates-config") + +# Register the operator handlers for the training platform operator. +# +# TODO: These handler registrations are done against the global kopf registry +# and thus will not apply to secondary operator instances which are created in +# separate threads later as they will use their own registry. This means +# liveness probes aren't currently checking access to secondary clusters. Also +# need to check whether settings are being applied to the secondary operator +# instances. + + +@kopf.on.startup() +def configure(settings: kopf.OperatorSettings, **_) -> None: + """Configures the kopf operator settings.""" + + settings.posting.level = logging.ERROR + settings.watching.connect_timeout = 1 * 60 + settings.watching.server_timeout = 5 * 60 + settings.watching.client_timeout = settings.watching.server_timeout + 10 + + +@kopf.on.login() +def login_fn(**kwargs) -> dict: + """Returns login credentials to be used by the kopf operator framework using + the pykube library so that the operator framework is using the same means of + getting credentials as the pykube library.""" + + return kopf.login_via_pykube(**kwargs) + + +@kopf.on.probe(id="api") +def check_api_access(**_) -> None: + """Checks if we can access the Kubernetes API for the liveness probe. The + kopf framework will handle the response to the liveness probe based on + the result of this function. The kopf operator framework will also do + basic checks to determine if the operator is still running and if it is + able to process events.""" + + try: + api = pykube.HTTPClient(pykube.KubeConfig.from_env()) + pykube.Namespace.objects(api).get(name="default") + + except pykube.exceptions.KubernetesError: + logger.error("Failed liveness probe request to Kubernetes API.") + + raise + + +# Process variables and shutdown handling. Signal handlers run in the main +# thread so we need to use global event objects to signal the kopf framework +# and HTTP server, which run in separate threads, to stop processing. + +_kopf_main_process_thread = None # pylint: disable=invalid-name +_kopf_main_event_loop = None # pylint: disable=invalid-name + +_aiohttp_main_process_thread = None # pylint: disable=invalid-name +_aiohttp_main_event_loop = None # pylint: disable=invalid-name + +_shutdown_server_process_flag = threading.Event() + + +def shutdown_server_process(signum: int, *_) -> None: + """Signal handler for shutting down the server process. This will set the + stop flag for the kopf framework and HTTP server to stop processing.""" + + logger.info("Signal handler called with signal %s.", signum) + + if _kopf_main_event_loop: + _shutdown_server_process_flag.set() + + +def register_signal_handlers() -> None: + """Registers signal handlers for the server process. This will allow the + server process to be shutdown cleanly when a signal is received.""" + + signal.signal(signal.SIGINT, shutdown_server_process) + signal.signal(signal.SIGTERM, shutdown_server_process) + + +@kopf.on.cleanup() +async def cleanup_fn(**_) -> None: + """Cleanup function for the operator.""" + + # This is a workaround for a possible bug in kopf where the cleanup function + # isn't being called when the operator is stopped. This sets the stop flag + # for the operator to stop processing again. This may no longer be required. + + _shutdown_server_process_flag.set() + + +# Global data structures to be shared across the kopf operator and uvicorn +# server threads. + +service_state = ServiceState( + client_database=client_database, + tenant_database=tenant_database, + cluster_database=cluster_database, +) + + +def run_kopf() -> threading.Thread: + """Run kopf in a separate thread.""" + + def worker_thread(): + logger.info("Starting kopf framework main loop.") + + # Need to create and set the event loop since this isn't being + # called in the main thread. + + global _kopf_main_event_loop # pylint: disable=global-statement + + _kopf_main_event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_kopf_main_event_loop) + + # Run the kopf operator framework until the shutdown flag is set. + + with contextlib.closing(_kopf_main_event_loop): + _kopf_main_event_loop.run_until_complete( + kopf.operator( + clusterwide=False, + namespaces=[OPERATOR_NAMESPACE], + stop_flag=_shutdown_server_process_flag, + memo=service_state, + liveness_endpoint="http://0.0.0.0:8081/healthz", + ) + ) + + # Start the kopf operator framework in a separate thread. + + thread = threading.Thread(target=worker_thread) + thread.start() + + return thread + + +def run_aiohttp() -> threading.Thread: + """Run aiohttp in a separate thread.""" + + aiohttp_app = aiohttp.web.Application() + + aiohttp_app["service_state"] = service_state + + register_routes(aiohttp_app) + + runner = aiohttp.web.AppRunner(aiohttp_app) + + async def wait_for_process_shutdown() -> None: + """Wait for the server process to shutdown and then shutdown the HTTP + server.""" + + # Wait for the shutdown flag to be set. + + while not _shutdown_server_process_flag.is_set(): + await asyncio.sleep(1) + + # Shutdown the aiohttp server. + + await runner.cleanup() + + def worker_thread() -> None: + """Worker thread for running the HTTP server.""" + + # Need to create a separate event loop for the HTTP server since this + # isn't being called in the main thread. + + global _aiohttp_main_event_loop # pylint: disable=global-statement + + _aiohttp_main_event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_aiohttp_main_event_loop) + + async def run_app() -> None: + await runner.setup() + site = aiohttp.web.TCPSite(runner, "0.0.0.0", 8080) + await site.start() + + with contextlib.closing(_aiohttp_main_event_loop): + _aiohttp_main_event_loop.run_until_complete( + asyncio.gather(run_app(), wait_for_process_shutdown()) + ) + + # Start the HTTP server in a separate thread. + + thread = threading.Thread(target=worker_thread) + thread.start() + + return thread + + +# Main entry point for the educates operator. This will start the kopf operator +# framework and the HTTP server. + +if __name__ == "__main__": + + # Set up logging for the educates operator. + + logging.basicConfig(level=logging.INFO) + logger.setLevel(logging.INFO) + + # Suppress verbose logging from urllib3 if ever set general log level to + # more verbose setting. + + logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) + + # Register signal handlers for the server process. + + register_signal_handlers() + + # Start the kopf framework and HTTP server threads. + + _kopf_main_process_thread = run_kopf() + _aiohttp_main_process_thread = run_aiohttp() + + # Wait for the kopf framework and HTTP server threads to complete. This + # will block until the threads are finished which will only occur when the + # shutdown process signal is received. + + _kopf_main_process_thread.join() + _aiohttp_main_process_thread.join() diff --git a/lookup-service/service/routes/__init__.py b/lookup-service/service/routes/__init__.py new file mode 100644 index 000000000..f664a37c6 --- /dev/null +++ b/lookup-service/service/routes/__init__.py @@ -0,0 +1,22 @@ +"""Handlers for HTTP API endpoints.""" + +from aiohttp import web + +from . import authnz, clients, clusters, portals, tenants, workshops + + +def register_routes(app: web.Application) -> None: + """Register the HTTP API routes with the application.""" + + # Register authentication and authorization middleware/routes. + + app.middlewares.extend(authnz.middlewares) + app.add_routes(authnz.routes) + + # Register the routes for the different parts of the service. + + app.add_routes(clients.routes) + app.add_routes(clusters.routes) + app.add_routes(portals.routes) + app.add_routes(tenants.routes) + app.add_routes(workshops.routes) diff --git a/lookup-service/service/routes/authnz.py b/lookup-service/service/routes/authnz.py new file mode 100644 index 000000000..2a012c387 --- /dev/null +++ b/lookup-service/service/routes/authnz.py @@ -0,0 +1,212 @@ +"""HTTP API handlers and decorators for controlling access to the REST API. +""" + +import datetime +from typing import Callable + +import jwt +from aiohttp import web + +from ..config import jwt_token_secret + +TOKEN_EXPIRATION = 72 # Expiration in hours. + + +def generate_client_token(username: str, uid: str) -> dict: + """Generate a JWT token for the client. The token will be set to expire and + will need to be renewed. The token will contain the username and the unique + identifier for the client.""" + + expires_at = int( + ( + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(hours=TOKEN_EXPIRATION) + ).timestamp() + ) + + jwt_token = jwt.encode( + {"sub": username, "jti": uid, "exp": expires_at}, + jwt_token_secret(), + algorithm="HS256", + ) + + return { + "access_token": jwt_token, + "token_type": "Bearer", + "expires_at": expires_at, + } + + +def decode_client_token(token: str) -> dict: + """Decode the client token and return the decoded token. If the token is + invalid, an exception will be raised.""" + + return jwt.decode(token, jwt_token_secret(), algorithms=["HS256"]) + + +@web.middleware +async def jwt_token_middleware( + request: web.Request, handler: Callable[..., web.Response] +) -> web.Response: + """Extract and decode the JWT token from the Authorization header, if + present. Store the decoded details in the request object for later use by + decorators on the individual request handlers that need to authenticate the + client and check for required authorization. + """ + + # Extract the Authorization header from the request if present. + + authorization = request.headers.get("Authorization") + + if authorization: + # Check if the Authorization header is a Bearer token. + + parts = authorization.split() + + if len(parts) != 2: + return web.Response(text="Invalid Authorization header", status=400) + + if parts[0].lower() != "bearer": + return web.Response(text="Invalid Authorization header", status=400) + + # Decode the JWT token passed in the Authorization header. + + try: + token = parts[1] + decoded_token = decode_client_token(token) + except jwt.ExpiredSignatureError: + return web.Response(text="JWT token has expired", status=403) + except jwt.InvalidTokenError: + return web.Response(text="JWT token is invalid", status=403) + + # Store the decoded token in the request object for later use. + + request["jwt_token"] = decoded_token + request["client_name"] = decoded_token["sub"] + + # Continue processing the request. + + return await handler(request) + + +def login_required(handler: Callable[..., web.Response]) -> web.Response: + """Decorator to verify that client is logged in to the service.""" + + async def wrapper(request: web.Request) -> web.Response: + # Check if the decoded JWT token is present in the request object. + + if "jwt_token" not in request: + return web.Response(text="JWT token not supplied", status=400) + + decoded_token = request["jwt_token"] + + # Check the client database for the client by the name of the client + # taken from the JWT token subject. Then check if the identity of the + # client is still the same as the one recorded in the JWT token. + + service_state = request.app["service_state"] + client_database = service_state.client_database + + client = client_database.get_client(decoded_token["sub"]) + + if not client: + return web.Response(text="Client not found", status=403) + + if not client.validate_identity(decoded_token["jti"]): + return web.Response(text="Client identity not valid", status=403) + + # Continue processing the request. + + return await handler(request) + + return wrapper + + +def roles_accepted( + *roles: str, +) -> Callable[[Callable[..., web.Response]], web.Response]: + """Decorator to check that the client has access to the endpoint by + confirming that is has any role required by the endpoint for access.""" + + def decorator(handler: Callable[..., web.Response]) -> web.Response: + async def wrapper(request: web.Request) -> web.Response: + # Check if the decoded JWT token is present in the request object. + + if "jwt_token" not in request: + return web.Response(text="JWT token not supplied", status=400) + + decoded_token = request["jwt_token"] + + # Lookup the client by the name of the client taken from the JWT + # token subject. + + service_state = request.app["service_state"] + client_database = service_state.client_database + + client_name = decoded_token["sub"] + client = client_database.get_client(client_name) + + if not client: + return web.Response(text="Client not found", status=403) + + # Check if the client has one of the required roles. + + matched_roles = client.has_required_role(*roles) + + if not matched_roles: + return web.Response(text="Client access not permitted", status=403) + + request["remote_client"] = client + request["client_roles"] = matched_roles + + # Continue processing the request. + + return await handler(request) + + return wrapper + + return decorator + + +async def api_login_handler(request: web.Request) -> web.Response: + """Login handler for accessing the web application. Validates the username + and password provided in the request and returns a JWT token if the + credentials are valid.""" + + # Extract the username and password from the request POST data. + + data = await request.json() + + username = data.get("username") + password = data.get("password") + + if username is None: + return web.Response(text="No username provided", status=400) + + if password is None: + return web.Response(text="No password provided", status=400) + + # Check if the password is correct for the username. + + service_state = request.app["service_state"] + client_database = service_state.client_database + + uid = client_database.authenticate_client(username, password) + + if not uid: + return web.Response(text="Invalid username/password", status=401) + + # Generate a JWT token for the user and return it. The response is + # bundle with the token type and expiration time so they can be used + # by the client without needing to parse the actual JWT token. + + token = generate_client_token(username, uid) + + return web.json_response(token) + + +# Set up the middleware and routes for the authentication and authorization. + +middlewares = [jwt_token_middleware] + +routes = [web.post("/login", api_login_handler)] diff --git a/lookup-service/service/routes/clients.py b/lookup-service/service/routes/clients.py new file mode 100644 index 000000000..8d91b2683 --- /dev/null +++ b/lookup-service/service/routes/clients.py @@ -0,0 +1,55 @@ +"""REST API handlers for client management.""" + +from aiohttp import web + +from .authnz import login_required, roles_accepted + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clients(request: web.Request) -> web.Response: + """Returns a list of clients which can access the service.""" + + service_state = request.app["service_state"] + client_database = service_state.client_database + + data = { + "clients": [ + {"name": client.name, "roles": client.roles, "tenants": client.tenants} + for client in client_database.get_clients() + ] + } + + return web.json_response(data) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clients_details(request: web.Request) -> web.Response: + """Returns details for the specified client.""" + + client_name = request.match_info["client"] + + service_state = request.app["service_state"] + client_database = service_state.client_database + + client = client_database.get_client(client_name) + + if not client: + return web.Response(text="Client not available", status=403) + + details = { + "name": client.name, + "roles": client.roles, + "tenants": client.tenants, + } + + return web.json_response(details) + + +# Set up the routes for the client management API. + +routes = [ + web.get("/api/v1/clients", api_get_v1_clients), + web.get("/api/v1/clients/{client}", api_get_v1_clients_details), +] diff --git a/lookup-service/service/routes/clusters.py b/lookup-service/service/routes/clusters.py new file mode 100644 index 000000000..742f8ea78 --- /dev/null +++ b/lookup-service/service/routes/clusters.py @@ -0,0 +1,314 @@ +"""REST API handlers for cluster management.""" + +import yaml +from aiohttp import web + +from .authnz import login_required, roles_accepted + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters(request: web.Request) -> web.Response: + """Returns a list of clusters available to the user.""" + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + data = { + "clusters": [ + {"name": cluster.name, "labels": cluster.labels} + for cluster in cluster_database.get_clusters() + ] + } + + return web.json_response(data) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters_details(request: web.Request) -> web.Response: + """Returns details for the specified cluster.""" + + cluster_name = request.match_info["cluster"] + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + cluster = cluster_database.get_cluster(cluster_name) + + if not cluster: + return web.Response(text="Cluster not available", status=403) + + details = { + "name": cluster.name, + "labels": cluster.labels, + } + + return web.json_response(details) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters_kubeconfig(request: web.Request) -> web.Response: + """Returns a kubeconfig file for the specified cluster.""" + + cluster_name = request.match_info["cluster"] + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + cluster = cluster_database.get_cluster(cluster_name) + + if not cluster: + return web.Response(text="Cluster not available", status=403) + + kubeconfig = yaml.dump(cluster.kubeconfig) + + return web.Response(text=kubeconfig) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters_portals(request: web.Request) -> web.Response: + """Returns a list of portals for the specified cluster.""" + + cluster_name = request.match_info["cluster"] + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + cluster = cluster_database.get_cluster(cluster_name) + + if not cluster: + return web.Response(text="Cluster not available", status=403) + + data = { + "portals": [ + { + "name": portal.name, + "uid": portal.uid, + "generation": portal.generation, + "labels": portal.labels, + "url": portal.url, + "capacity": portal.capacity, + "allocated": portal.allocated, + "phase": portal.phase, + } + for portal in cluster.get_portals() + ] + } + + return web.json_response(data) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters_portals_environments( + request: web.Request, +) -> web.Response: + """Returns a list of environments for a portal running on a cluster.""" + + cluster_name = request.match_info["cluster"] + portal_name = request.match_info["portal"] + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + cluster = cluster_database.get_cluster(cluster_name) + + if not cluster: + return web.Response(text="Cluster not available", status=403) + + portal = cluster.get_portal(portal_name) + + if not portal: + return web.Response(text="Portal not available", status=403) + + environments = portal.get_environments() + + data = { + "environments": [ + { + "name": environment.name, + "generation": environment.generation, + "workshop": environment.workshop, + "title": environment.title, + "description": environment.description, + "labels": environment.labels, + "capacity": environment.capacity, + "reserved": environment.reserved, + "allocated": environment.allocated, + "available": environment.available, + "phase": environment.phase, + } + for environment in environments + ] + } + + return web.json_response(data) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters_portals_environments_sessions( + request: web.Request, +) -> web.Response: + """Returns a list of workshop sessions for an environment running on portal.""" + + cluster_name = request.match_info["cluster"] + portal_name = request.match_info["portal"] + environment_name = request.match_info["environment"] + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + cluster = cluster_database.get_cluster(cluster_name) + + if not cluster: + return web.Response(text="Cluster not available", status=403) + + portal = cluster.get_portal(portal_name) + + if not portal: + return web.Response(text="Portal not available", status=403) + + environment = portal.get_environment(environment_name) + + if not environment: + return web.Response(text="Environment not available", status=403) + + sessions = environment.get_sessions() + + data = { + "sessions": [ + { + "name": session.name, + "generation": session.generation, + "environment": session.environment.name, + "workshop": session.environment.workshop, + "phase": session.phase, + "user": session.user, + } + for session in sessions + ] + } + + return web.json_response(data) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters_portals_environments_users( + request: web.Request, +) -> web.Response: + """Returns a list of users for an environment running on portal.""" + + cluster_name = request.match_info["cluster"] + portal_name = request.match_info["portal"] + environment_name = request.match_info["environment"] + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + cluster = cluster_database.get_cluster(cluster_name) + + if not cluster: + return web.Response(text="Cluster not available", status=403) + + portal = cluster.get_portal(portal_name) + + if not portal: + return web.Response(text="Portal not available", status=403) + + environment = portal.get_environment(environment_name) + + if not environment: + return web.Response(text="Environment not available", status=403) + + sessions = environment.get_sessions() + + users = set() + + for session in sessions: + if session.user not in users: + users.add(session.user) + + data = {"users": list(users)} + + return web.json_response(data) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_clusters_portals_environments_users_sessions( + request: web.Request, +) -> web.Response: + """Returns a list of workshop sessions for a user in an environment running on portal.""" + + cluster_name = request.match_info["cluster"] + portal_name = request.match_info["portal"] + environment_name = request.match_info["environment"] + user_name = request.match_info["user"] + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + cluster = cluster_database.get_cluster(cluster_name) + + if not cluster: + return web.Response(text="Cluster not available", status=403) + + portal = cluster.get_portal(portal_name) + + if not portal: + return web.Response(text="Portal not available", status=403) + + environment = portal.get_environment(environment_name) + + if not environment: + return web.Response(text="Environment not available", status=403) + + sessions = environment.get_sessions() + + data = { + "sessions": [ + { + "name": session.name, + "generation": session.generation, + "environment": session.environment.name, + "workshop": session.environment.workshop, + "phase": session.phase, + "user": session.user, + } + for session in sessions + if session.user == user_name + ] + } + + return web.json_response(data) + + +# Set up the routes for the cluster management API. + +routes = [ + web.get("/api/v1/clusters", api_get_v1_clusters), + web.get("/api/v1/clusters/{cluster}", api_get_v1_clusters_details), + web.get("/api/v1/clusters/{cluster}/kubeconfig", api_get_v1_clusters_kubeconfig), + web.get("/api/v1/clusters/{cluster}/portals", api_get_v1_clusters_portals), + web.get( + "/api/v1/clusters/{cluster}/portals/{portal}/environments", + api_get_v1_clusters_portals_environments, + ), + web.get( + "/api/v1/clusters/{cluster}/portals/{portal}/environments/{environment}/sessions", + api_get_v1_clusters_portals_environments_sessions, + ), + web.get( + "/api/v1/clusters/{cluster}/portals/{portal}/environments/{environment}/users", + api_get_v1_clusters_portals_environments_users, + ), + web.get( + "/api/v1/clusters/{cluster}/portals/{portal}/environments/{environment}/users/{user}/sessions", # pylint: disable=line-too-long + api_get_v1_clusters_portals_environments_users_sessions, + ), +] diff --git a/lookup-service/service/routes/portals.py b/lookup-service/service/routes/portals.py new file mode 100644 index 000000000..5508b6e56 --- /dev/null +++ b/lookup-service/service/routes/portals.py @@ -0,0 +1,44 @@ +"""REST API handlers for portal management.""" + +from aiohttp import web + +from .authnz import login_required, roles_accepted + + +@login_required +@roles_accepted("admin") +async def api_get_v1_portals(request: web.Request) -> web.Response: + """Returns a list of portals available to the user.""" + + service_state = request.app["service_state"] + cluster_database = service_state.cluster_database + + portals = [] + + for cluster in cluster_database.get_clusters(): + for portal in cluster.get_portals(): + portals.append(portal) + + data = { + "portals": [ + { + "name": portal.name, + "uid": portal.uid, + "generation": portal.generation, + "labels": portal.labels, + "cluster": portal.cluster.name, + "url": portal.url, + "capacity": portal.capacity, + "allocated": portal.allocated, + "phase": portal.phase, + } + for portal in portals + ] + } + + return web.json_response(data) + + +# Set up the routes for the portal management API. + +routes = [web.get("/api/v1/portals", api_get_v1_portals)] diff --git a/lookup-service/service/routes/tenants.py b/lookup-service/service/routes/tenants.py new file mode 100644 index 000000000..97c46173c --- /dev/null +++ b/lookup-service/service/routes/tenants.py @@ -0,0 +1,127 @@ +"""REST API handlers for tenant management.""" + +from aiohttp import web + +from .authnz import login_required, roles_accepted + + +def get_clients_mapped_to_tenant(client_database, tenant_name: str) -> int: + """Return the names of the clients mapped to the tenant.""" + + return [ + client.name + for client in client_database.get_clients() + if client.allowed_access_to_tenant(tenant_name) + ] + + +@login_required +@roles_accepted("admin") +async def api_get_v1_tenants(request: web.Request) -> web.Response: + """Returns a list of tenants.""" + + service_state = request.app["service_state"] + tenant_database = service_state.tenant_database + client_database = service_state.client_database + + data = { + "tenants": [ + { + "name": tenant.name, + "clients": get_clients_mapped_to_tenant(client_database, tenant.name), + } + for tenant in tenant_database.get_tenants() + ] + } + + return web.json_response(data) + + +@login_required +@roles_accepted("admin") +async def api_get_v1_tenants_details(request: web.Request) -> web.Response: + """Returns details for the specified tenant.""" + + tenant_name = request.match_info["tenant"] + + service_state = request.app["service_state"] + tenant_database = service_state.tenant_database + client_database = service_state.client_database + + tenant = tenant_database.get_tenant(tenant_name) + + if not tenant: + return web.Response(text="Tenant not available", status=403) + + details = { + "name": tenant.name, + "clients": get_clients_mapped_to_tenant(client_database, tenant.name), + } + + return web.json_response(details) + + +@login_required +@roles_accepted("admin", "tenant") +async def api_get_v1_tenants_workshops(request: web.Request) -> web.Response: + """Returns a list of workshops for the specified tenant.""" + + service_state = request.app["service_state"] + tenant_database = service_state.tenant_database + client_database = service_state.client_database + + # Grab tenant name from path parameters. If the client has the tenant role + # they can only access tenants they are mapped to. + + tenant_name = request.match_info["tenant"] + + if not tenant_name: + return web.Response(text="Missing tenant name", status=400) + + client_name = request["client_name"] + client_roles = request["client_roles"] + + if "tenant" in client_roles: + client = client_database.get_client(client_name) + + if not client: + return web.Response(text="Client not found", status=403) + + if not client.allowed_access_to_tenant(tenant_name): + return web.Response(text="Client access not permitted", status=403) + + # Work out the set of portals accessible for this tenant. + + tenant = tenant_database.get_tenant(tenant_name) + + if not tenant: + return web.Response(text="Tenant not available", status=403) + + accessible_portals = tenant.portals_which_are_accessible() + + # Generate the list of workshops available to the user for this tenant which + # are in a running state. We need to eliminate any duplicates as a workshop + # may be available through multiple training portals. We use the title and + # description from the last found so we expect these to be consistent. + + workshops = {} + + for portal in accessible_portals: + for environment in portal.get_running_environments(): + workshops[environment.workshop] = { + "name": environment.workshop, + "title": environment.title, + "description": environment.description, + "labels": environment.labels, + } + + return web.json_response({"workshops": list(workshops.values())}) + + +# Set up the routes for the tenant management API. + +routes = [ + web.get("/api/v1/tenants", api_get_v1_tenants), + web.get("/api/v1/tenants/{tenant}", api_get_v1_tenants_details), + web.get("/api/v1/tenants/{tenant}/workshops", api_get_v1_tenants_workshops), +] diff --git a/lookup-service/service/routes/workshops.py b/lookup-service/service/routes/workshops.py new file mode 100644 index 000000000..4a5071f64 --- /dev/null +++ b/lookup-service/service/routes/workshops.py @@ -0,0 +1,334 @@ +"""REST API handlers for workshop requests.""" + +import logging + +from aiohttp import web + +from .authnz import login_required, roles_accepted + +logger = logging.getLogger("educates") + + +@login_required +@roles_accepted("admin", "tenant") +async def api_get_v1_workshops(request: web.Request) -> web.Response: + """Returns a list of workshops available.""" + + service_state = request.app["service_state"] + tenant_database = service_state.tenant_database + client_database = service_state.client_database + + # Get the tenant name from the query parameters. This is required when + # the client role is "tenant". + + tenant_name = request.query.get("tenant") + + client_name = request["client_name"] + client_roles = request["client_roles"] + + if "tenant" in client_roles: + if not tenant_name: + logger.warning("Missing tenant name in request from client %r.", client_name) + + return web.Response(text="Missing tenant name", status=400) + + client = client_database.get_client(client_name) + + if not client: + return web.Response(text="Client not found", status=403) + + if not client.allowed_access_to_tenant(tenant_name): + return web.Response(text="Client access not permitted", status=403) + + # Work out the set of portals accessible by the specified tenant. + + if tenant_name: + tenant = tenant_database.get_tenant(tenant_name) + + if not tenant: + return web.Response(text="Tenant not available", status=403) + + accessible_portals = tenant.portals_which_are_accessible() + + else: + # Collect list of portals from all the clusters. + + accessible_portals = [] + + cluster_database = service_state.cluster_database + + for cluster in cluster_database.get_clusters(): + accessible_portals.extend(cluster.get_portals()) + + # Generate the list of workshops available to the user for this tenant which + # are in a running state. We need to eliminate any duplicates as a workshop + # may be available through multiple training portals. We use the title and + # description from the last found so we expect these to be consistent. + + workshops = {} + + for portal in accessible_portals: + for environment in portal.get_running_environments(): + workshops[environment.workshop] = { + "name": environment.workshop, + "title": environment.title, + "description": environment.description, + "labels": environment.labels, + } + + return web.json_response({"workshops": list(workshops.values())}) + + +@login_required +@roles_accepted("admin", "tenant") +async def api_post_v1_workshops(request: web.Request) -> web.Response: + """Returns a workshop session for the specified tenant and workshop.""" + + data = await request.json() + + client_name = request["client_name"] + + tenant_name = data.get("tenantName") + + # TODO: Need to see how can use the action ID supplied by the client. At the + # moment we just log it. + + user_id = data.get("clientUserId") or "" + action_id = data.get("clientActionId") or "" # pylint: disable=unused-variable + index_url = data.get("clientIndexUrl") or "" + + workshop_name = data.get("workshopName") + parameters = data.get("workshopParams", []) + + logger.info( + "Workshop request from client %r for tenant %r, workshop %r, user %r, action %r", + client_name, + tenant_name, + workshop_name, + user_id, + action_id, + ) + + if not tenant_name: + logger.warning("Missing tenant name in request from client %r.", client_name) + + return web.Response(text="Missing tenantName", status=400) + + if not workshop_name: + logger.warning("Missing workshop name in request from client %r.", client_name) + + return web.Response(text="Missing workshopName", status=400) + + # Check that client is allowed access to this tenant. + + client = request["remote_client"] + + if not client.allowed_access_to_tenant(tenant_name): + logger.warning( + "Client %r not allowed access to tenant %r", client_name, tenant_name + ) + + return web.Response(text="Client not allowed access to tenant", status=403) + + # Find the portals accessible to the tenant which hosts the workshop. + + service_state = request.app["service_state"] + tenant_database = service_state.tenant_database + + tenant = tenant_database.get_tenant(tenant_name) + + if not tenant: + logger.error("Configuration for tenant %r could not be found", tenant_name) + + return web.Response(text="Tenant not available", status=403) + + # Get the list of portals hosting the workshop and calculate the subset + # that are accessible to the tenant. + + accessible_portals = tenant.portals_which_are_accessible() + + selected_portals = [] + + for portal in accessible_portals: + if portal.hosts_workshop(workshop_name): + selected_portals.append(portal) + + # If there are no resulting portals, then the workshop is not available to + # the tenant. + + if not selected_portals: + logger.warning( + "Workshop %s requested by client %r not available to tenant %r", + workshop_name, + client_name, + tenant_name, + ) + + return web.Response(text="Workshop not available", status=503) + + # If a user ID is supplied, check each of the portals to see if this user + # already has a workshop session for this workshop. + + if user_id: + for portal in selected_portals: + session = portal.find_existing_workshop_session_for_user( + user_id, workshop_name + ) + + if session: + data = await session.reacquire_workshop_session(index_url) + + if data: + data["tenantName"] = tenant_name + return web.json_response(data) + + # Find the set of workshop environments for the specified workshop that are + # in a running state. If there are no such environments, then the workshop + # is not available. + + environments = [] + + for portal in selected_portals: + for environment in portal.get_running_environments(): + if environment.workshop == workshop_name: + environments.append(environment) + + if not environments: + logger.warning( + "Workshop %r requested by client %r not available", + workshop_name, + client_name, + ) + + return web.Response(text="Workshop not available", status=503) + + # Choose the best workshop environment to allocate a session from based on + # available capacity of the workshop environment and the portal hosting it. + + environment = choose_best_workshop_environment(environments) + + if environment: + data = await environment.request_workshop_session( + user_id, parameters, index_url + ) + + if data: + data["tenantName"] = tenant_name + return web.json_response(data) + + # If we get here, then we don't believe there is any available capacity for + # creating a workshop session. Even so, attempt to create a session against + # any workshop environment, just make sure that we don't try and use the + # same workshop environment we just tried to get a session from. + + if environment: + environments.remove(environment) + + if not environments: + logger.warning( + "Workshop %r requested by client %r not available", + workshop_name, + client_name, + ) + + return web.Response(text="Workshop not available", status=503) + + environment = environments[0] + + data = await environment.request_workshop_session(user_id, parameters, index_url) + + if data: + data["tenantName"] = tenant_name + return web.json_response(data) + + # If we get here, then we don't believe there is any available capacity for + # creating a workshop session. + + logger.warning( + "Workshop %r requested by client %r not available", workshop_name, client_name + ) + + return web.Response(text="Workshop not available", status=503) + + +def choose_best_workshop_environment(environments): + """Choose the best workshop environment to allocate a session from.""" + + if len(environments) == 1: + return environments[0] + + # First discard any workshop environment which have no more space available. + + environments = [ + environment + for environment in environments + if environment.capacity and (environment.capacity - environment.allocated > 0) + ] + + # Also discard any workshop environments where the portal as a whole has + # no more capacity. + + environments = [ + environment + for environment in environments + if environment.portal.capacity + and (environment.portal.capacity - environment.portal.allocated > 0) + ] + + # If there is only one workshop environment left, return it. + + if len(environments) == 1: + return environments[0] + + # If there are no workshop environments left, return None. + + if len(environments) == 0: + return None + + # If there are multiple workshop environments left, starting with the portal + # with the most capacity remaining, look at number of reserved sessions + # available for a workshop environment and if any, allocate it from the + # workshop environment with the most. In other words, sort based on the + # number of reserved sessions and if the first in the resulting list has + # reserved sessions, use that. + + def score_based_on_reserved_sessions(environment): + return ( + environment.portal.capacity + and (environment.portal.capacity - environment.portal.allocated) + or 1, + environment.available, + ) + + environments.sort(key=score_based_on_reserved_sessions, reverse=True) + + if environments[0].available > 0: + return environments[0] + + # If there are no reserved sessions available, starting with the portal + # with the most capacity remaining, look at the available capacity within + # the workshop environment. In other words, sort based on the number of free + # spots in the workshop environment and if the first in the resulting list + # has free spots, use that. + + def score_based_on_available_capacity(environment): + return ( + environment.portal.capacity + and (environment.portal.capacity - environment.portal.allocated) + or 1, + environment.capacity + and (environment.capacity - environment.allocated) + or 1, + ) + + environments.sort(key=score_based_on_available_capacity, reverse=True) + + return environments[0] + + +# Set up the routes for the workshop management API. + +routes = [ + web.get("/api/v1/workshops", api_get_v1_workshops), + web.post("/api/v1/workshops", api_post_v1_workshops), +] diff --git a/lookup-service/service/service.py b/lookup-service/service/service.py new file mode 100644 index 000000000..5d5aff658 --- /dev/null +++ b/lookup-service/service/service.py @@ -0,0 +1,21 @@ +"""Custom operator context object for the service.""" + +from dataclasses import dataclass + +from .caches.databases import ( + ClientDatabase, + TenantDatabase, + ClusterDatabase, +) + + +@dataclass +class ServiceState: + """Custom operator context object for the service.""" + + client_database: ClientDatabase + tenant_database: TenantDatabase + cluster_database: ClusterDatabase + + def __copy__(self) -> "ServiceState": + return self diff --git a/lookup-service/start-service.sh b/lookup-service/start-service.sh new file mode 100755 index 000000000..37bdaccf0 --- /dev/null +++ b/lookup-service/start-service.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +# Start the service. + +exec python -m service.main