From 9e7120abd374bd2d4b160ed32d8d013011a202ab Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Wed, 4 Sep 2024 23:39:01 +0200 Subject: [PATCH] feat: add cloud storage via rclone --- Makefile | 2 +- bases/renku_data_services/data_api/app.py | 1 + .../api/classes/cloud_storage/__init__.py | 8 +- .../notebooks/api/schemas/cloud_storage.py | 121 +++++++++++------- .../notebooks/blueprints.py | 76 +++++++++-- .../notebooks/cr_amalthea_session.py | 53 ++++++-- .../renku_data_services/notebooks/crs.py | 5 +- 7 files changed, 194 insertions(+), 72 deletions(-) diff --git a/Makefile b/Makefile index 05751f088..3f220d2e0 100644 --- a/Makefile +++ b/Makefile @@ -161,5 +161,5 @@ install_amaltheas: ## Installs both version of amalthea in the. NOTE: It uses t # TODO: Add the version variables from the top of the file here when the charts are fully published amalthea_schema: ## Updates generates pydantic classes from CRDs - curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/fix-missing-status/config/crd/bases/amalthea.dev_amaltheasessions.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_amalthea_session.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg + curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/feat-add-cloud-storage/config/crd/bases/amalthea.dev_amaltheasessions.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_amalthea_session.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/main/controller/crds/jupyter_server.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_jupyter_server.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg diff --git a/bases/renku_data_services/data_api/app.py b/bases/renku_data_services/data_api/app.py index aa3cfd116..a51de6989 100644 --- a/bases/renku_data_services/data_api/app.py +++ b/bases/renku_data_services/data_api/app.py @@ -142,6 +142,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic: nb_config=config.nb_config, internal_gitlab_authenticator=config.gitlab_authenticator, git_repo=config.git_repositories_repo, + rp_repo=config.rp_repo, ) notebooks_new = NotebooksNewBP( name="notebooks", diff --git a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py index 015653284..a66b2728d 100644 --- a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py +++ b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py @@ -6,17 +6,15 @@ class ICloudStorageRequest(Protocol): """The abstract class for cloud storage.""" - exists: bool mount_folder: str - source_folder: str - bucket: str + source_path: str def get_manifest_patch( self, base_name: str, namespace: str, - labels: dict[str, str] = {}, - annotations: dict[str, str] = {}, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """The patches applied to a jupyter server to insert the storage in the session.""" ... diff --git a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py index 05b141c3c..5b848f8fa 100644 --- a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py +++ b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py @@ -2,15 +2,18 @@ from configparser import ConfigParser from io import StringIO -from pathlib import Path -from typing import Any, Optional, Self +from pathlib import PurePosixPath +from typing import Any, Final, Optional, Self +from kubernetes import client from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema from renku_data_services.base_models import APIUser from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest from renku_data_services.notebooks.config import _NotebooksConfig +_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization + class RCloneStorageRequest(Schema): """Request for RClone based storage.""" @@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None: class RCloneStorage(ICloudStorageRequest): """RClone based storage.""" + pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName" + def __init__( self, source_path: str, @@ -60,7 +65,7 @@ async def storage_from_schema( user: APIUser, internal_gitlab_user: APIUser, project_id: int, - work_dir: Path, + work_dir: PurePosixPath, config: _NotebooksConfig, ) -> Self: """Create storage object from request.""" @@ -92,8 +97,73 @@ async def storage_from_schema( await config.storage_validator.validate_storage_configuration(configuration, source_path) return cls(source_path, configuration, readonly, mount_folder, name, config) + def pvc( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1PersistentVolumeClaim: + """The PVC for mounting cloud storage.""" + return client.V1PersistentVolumeClaim( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}), + labels={"name": base_name} | (labels or {}), + ), + spec=client.V1PersistentVolumeClaimSpec( + access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"], + resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}), + storage_class_name=self.config.cloud_storage.storage_class, + ), + ) + + def volume_mount(self, base_name: str) -> client.V1VolumeMount: + """The volume mount for cloud storage.""" + return client.V1VolumeMount( + mount_path=self.mount_folder, + name=base_name, + read_only=self.readonly, + ) + + def volume(self, base_name: str) -> client.V1Volume: + """The volume entry for the statefulset specification.""" + return client.V1Volume( + name=base_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=base_name, read_only=self.readonly + ), + ) + + def secret( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1Secret: + """The secret containing the configuration for the rclone csi driver.""" + return client.V1Secret( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations=annotations, + labels={"name": base_name} | (labels or {}), + ), + string_data={ + "remote": self.name or base_name, + "remotePath": self.source_path, + "configData": self.config_string(self.name or base_name), + }, + ) + def get_manifest_patch( - self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {} + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """Get server manifest patch.""" patches = [] @@ -104,57 +174,22 @@ def get_manifest_patch( { "op": "add", "path": f"/{base_name}-pv", - "value": { - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "spec": { - "accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"], - "resources": {"requests": {"storage": "10Gi"}}, - "storageClassName": self.config.cloud_storage.storage_class, - }, - }, + "value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)), }, { "op": "add", "path": f"/{base_name}-secret", - "value": { - "apiVersion": "v1", - "kind": "Secret", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "type": "Opaque", - "stringData": { - "remote": self.name or base_name, - "remotePath": self.source_path, - "configData": self.config_string(self.name or base_name), - }, - }, + "value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)), }, { "op": "add", "path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-", - "value": { - "mountPath": self.mount_folder, - "name": base_name, - "readOnly": self.readonly, - }, + "value": _sanitize_for_serialization(self.volume_mount(base_name)), }, { "op": "add", "path": "/statefulset/spec/template/spec/volumes/-", - "value": { - "name": base_name, - "persistentVolumeClaim": { - "claimName": base_name, - "readOnly": self.readonly, - }, - }, + "value": _sanitize_for_serialization(self.volume(base_name)), }, ], } diff --git a/components/renku_data_services/notebooks/blueprints.py b/components/renku_data_services/notebooks/blueprints.py index 59460f33a..5faa1391b 100644 --- a/components/renku_data_services/notebooks/blueprints.py +++ b/components/renku_data_services/notebooks/blueprints.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import UTC, datetime from math import floor -from pathlib import Path +from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse @@ -55,6 +55,7 @@ Authentication, AuthenticationType, Culling, + DataSource, ExtraContainer, ExtraVolume, ExtraVolumeMount, @@ -64,10 +65,12 @@ Resources, SecretAsVolume, SecretAsVolumeItem, - SecretRef, + SecretRefKey, + SecretRefWhole, Session, SessionEnvItem, Storage, + TlsSecret, ) from renku_data_services.notebooks.errors.intermittent import AnonymousUserPatchError, PVDisabledError from renku_data_services.notebooks.errors.programming import ProgrammingError @@ -84,6 +87,7 @@ from renku_data_services.project.db import ProjectRepository from renku_data_services.repositories.db import GitRepositoriesRepository from renku_data_services.session.db import SessionRepository +from renku_data_services.storage.db import StorageV2Repository @dataclass(kw_only=True) @@ -413,7 +417,7 @@ async def launch_notebook_helper( if lfs_auto_fetch is not None: parsed_server_options.lfs_auto_fetch = lfs_auto_fetch - image_work_dir = image_repo.image_workdir(parsed_image) or Path("/") + image_work_dir = image_repo.image_workdir(parsed_image) or PurePosixPath("/") mount_path = image_work_dir / "work" server_work_dir = mount_path / gl_project_path @@ -428,7 +432,7 @@ async def launch_notebook_helper( cstorage.model_dump(), user=user, project_id=gl_project_id, - work_dir=server_work_dir.absolute(), + work_dir=server_work_dir, config=nb_config, internal_gitlab_user=internal_gitlab_user, ) @@ -772,6 +776,7 @@ class NotebooksNewBP(CustomBlueprint): project_repo: ProjectRepository session_repo: SessionRepository rp_repo: ResourcePoolRepository + storage_repo: StorageV2Repository def start(self) -> BlueprintFactoryResponse: """Start a session with the new operator.""" @@ -803,7 +808,7 @@ async def _handler( parsed_server_options = await self.nb_config.crc_validator.validate_class_storage( user, resource_class_id, body.disk_storage ) - work_dir = Path("/home/jovyan/work") + work_dir = environment.working_directory user_secrets: K8sUserSecrets | None = None # if body.user_secrets: # user_secrets = K8sUserSecrets( @@ -811,9 +816,41 @@ async def _handler( # user_secret_ids=body.user_secrets.user_secret_ids, # mount_path=body.user_secrets.mount_path, # ) - cloud_storage: list[RCloneStorage] = [] + cloud_storages_db = await self.storage_repo.get_storage( + user=user, project_id=project.id, include_secrets=True + ) + cloud_storage: dict[str, RCloneStorage] = { + str(s.storage_id): RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration.model_dump(mode="python"), + readonly=s.readonly, + config=self.nb_config, + name=s.name, + ) + for s in cloud_storages_db + } + cloud_storage_request: dict[str, RCloneStorage] = { + s.storage_id: RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration, + readonly=s.readonly, + config=self.nb_config, + name=None, + ) + for s in body.cloudstorage or [] + if s.storage_id is not None + } + # NOTE: Check the cloud storage in the request body and if any match + # then overwrite the projects cloud storages + # NOTE: Cloud storages in the session launch request body that are not form the DB are ignored + for csr_id, csr in cloud_storage_request.items(): + if csr_id in cloud_storage: + cloud_storage[csr_id] = csr # repositories = [Repository(i.url, branch=i.branch, commit_sha=i.commit_sha) for i in body.repositories] repositories = [Repository(url=i) for i in project.repositories] + secrets_to_create: list[V1Secret] = [] server = Renku2UserServer( user=user, image=image, @@ -823,7 +860,7 @@ async def _handler( server_options=parsed_server_options, environment_variables={}, user_secrets=user_secrets, - cloudstorage=cloud_storage, + cloudstorage=[i for i in cloud_storage.values()], k8s_client=self.nb_config.k8s_v2_client, workspace_mount_path=work_dir, work_dir=work_dir, @@ -833,6 +870,14 @@ async def _handler( is_image_private=False, internal_gitlab_user=internal_gitlab_user, ) + # Generate the cloud starge secrets + data_sources: list[DataSource] = [] + for ics, cs in enumerate(cloud_storage.values()): + secret_name = f"{server_name}-ds-{ics}" + secrets_to_create.append(cs.secret(secret_name, server.k8s_client.preferred_namespace)) + data_sources.append( + DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True)) + ) cert_init, cert_vols = init_containers.certificates_container(self.nb_config) session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))] extra_volumes = [ExtraVolume.model_validate(self.nb_config.k8s_v2_client.sanitize(i)) for i in cert_vols] @@ -865,9 +910,7 @@ async def _handler( manifest = AmaltheaSessionV1Alpha1( metadata=Metadata(name=server_name, annotations=annotations), spec=AmaltheaSessionSpec( - adoptSecrets=True, codeRepositories=[], - dataSources=[], hibernated=False, session=Session( image=image, @@ -895,7 +938,9 @@ async def _handler( host=self.nb_config.sessions.ingress.host, ingressClassName=self.nb_config.sessions.ingress.annotations.get("kubernetes.io/ingress.class"), annotations=self.nb_config.sessions.ingress.annotations, - tlsSecretName=self.nb_config.sessions.ingress.tls_secret, + tlsSecret=TlsSecret(name=self.nb_config.sessions.ingress.tls_secret, adopt=True) + if self.nb_config.sessions.ingress.tls_secret is not None + else None, ), extraContainers=extra_containers, initContainers=session_init_containers, @@ -912,13 +957,14 @@ async def _handler( type=AuthenticationType.oauth2proxy if isinstance(user, AuthenticatedAPIUser) else AuthenticationType.token, - secretRef=SecretRef(name=server_name, key="auth"), + secretRef=SecretRefKey(name=server_name, key="auth", adopt=True), extraVolumeMounts=[ ExtraVolumeMount(name="renku-authorized-emails", mountPath="/authorized_emails") ] if isinstance(user, AuthenticatedAPIUser) else [], ), + dataSources=data_sources, ), ) parsed_proxy_url = urlparse(urljoin(server.server_url + "/", "oauth2")) @@ -949,12 +995,14 @@ async def _handler( "verbose": True, } ) - secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data) - secret = await self.nb_config.k8s_v2_client.create_secret(secret) + secrets_to_create.append(V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.create_secret(s) try: manifest = await self.nb_config.k8s_v2_client.create_server(manifest, user.id) except Exception: - await self.nb_config.k8s_v2_client.delete_secret(secret.metadata.name) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name) raise errors.ProgrammingError(message="Could not start the amalthea session") return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201) diff --git a/components/renku_data_services/notebooks/cr_amalthea_session.py b/components/renku_data_services/notebooks/cr_amalthea_session.py index 061b25b76..a4c2e3fd9 100644 --- a/components/renku_data_services/notebooks/cr_amalthea_session.py +++ b/components/renku_data_services/notebooks/cr_amalthea_session.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: -# timestamp: 2024-08-23T21:15:52+00:00 +# timestamp: 2024-09-04T21:22:45+00:00 from __future__ import annotations @@ -43,6 +43,10 @@ class SecretRef(BaseCRD): model_config = ConfigDict( extra="allow", ) + adopt: Optional[bool] = Field( + default=None, + description="If the secret is adopted then the operator will delete the secret when the custom resource that uses it is deleted.", + ) key: str name: str @@ -72,6 +76,10 @@ class CloningConfigSecretRef(BaseCRD): model_config = ConfigDict( extra="allow", ) + adopt: Optional[bool] = Field( + default=None, + description="If the secret is adopted then the operator will delete the secret when the custom resource that uses it is deleted.", + ) key: str name: str @@ -80,6 +88,10 @@ class ConfigSecretRef(BaseCRD): model_config = ConfigDict( extra="allow", ) + adopt: Optional[bool] = Field( + default=None, + description="If the secret is adopted then the operator will delete the secret when the custom resource that uses it is deleted.", + ) key: str name: str @@ -147,6 +159,17 @@ class Culling(BaseCRD): ) +class SecretRef1(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + adopt: Optional[bool] = Field( + default=None, + description="If the secret is adopted then the operator will delete the secret when the custom resource that uses it is deleted.", + ) + name: str + + class Type2(Enum): rclone = "rclone" @@ -155,12 +178,15 @@ class DataSource(BaseCRD): model_config = ConfigDict( extra="allow", ) + accessMode: str = Field( + default="ReadOnlyMany", description="The access mode for the data source" + ) mountPath: str = Field( default="data", description="Path relative to the session working directory where the data should be mounted", example="data/storages", ) - secretRef: Optional[SecretRef] = Field( + secretRef: Optional[SecretRef1] = Field( default=None, description="The secret containing the configuration or credentials needed for access to the data.\nThe format of the configuration that is expected depends on the storage type.\nNOTE: define all values in a single key of the Kubernetes secret.\nrclone: any valid rclone configuration for a single remote, see the output of `rclone config providers` for validation and format.", ) @@ -2062,6 +2088,17 @@ class ExtraVolume(BaseCRD): ) +class TlsSecret(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + adopt: Optional[bool] = Field( + default=None, + description="If the secret is adopted then the operator will delete the secret when the custom resource that uses it is deleted.", + ) + name: str + + class Ingress(BaseCRD): model_config = ConfigDict( extra="allow", @@ -2069,7 +2106,7 @@ class Ingress(BaseCRD): annotations: Optional[Dict[str, str]] = None host: str ingressClassName: Optional[str] = None - tlsSecretName: Optional[str] = Field( + tlsSecret: Optional[TlsSecret] = Field( default=None, description="The name of the TLS secret, same as what is specified in a regular Kubernetes Ingress.", ) @@ -2701,7 +2738,11 @@ class Session(BaseCRD): default=None, description="Resource requirements and limits in the same format as a Pod in Kubernetes", ) - runAsGroup: int = Field(default=1000, ge=0) + runAsGroup: int = Field( + default=1000, + description="The group is set on the session and this value is also set as the fsgroup for the whole pod and all session\ncontianers.", + ge=0, + ) runAsUser: int = Field(default=1000, ge=0) shmSize: Optional[Union[int, str]] = Field( default=None, description="Size of /dev/shm" @@ -2721,10 +2762,6 @@ class Spec(BaseCRD): model_config = ConfigDict( extra="allow", ) - adoptSecrets: bool = Field( - ..., - description="Whether to adopt all secrets referred to by name in this CR. Adopted secrets will be deleted when the CR is deleted.", - ) authentication: Optional[Authentication] = Field( default=None, description="Authentication configuration for the session" ) diff --git a/components/renku_data_services/notebooks/crs.py b/components/renku_data_services/notebooks/crs.py index 6bac792dc..f8d4df091 100644 --- a/components/renku_data_services/notebooks/crs.py +++ b/components/renku_data_services/notebooks/crs.py @@ -24,12 +24,15 @@ SecretRef, Session, Storage, + TlsSecret, ) from renku_data_services.notebooks.cr_amalthea_session import EnvItem2 as SessionEnvItem from renku_data_services.notebooks.cr_amalthea_session import Item4 as SecretAsVolumeItem from renku_data_services.notebooks.cr_amalthea_session import Model as _ASModel from renku_data_services.notebooks.cr_amalthea_session import Resources3 as Resources from renku_data_services.notebooks.cr_amalthea_session import Secret1 as SecretAsVolume +from renku_data_services.notebooks.cr_amalthea_session import SecretRef as SecretRefKey +from renku_data_services.notebooks.cr_amalthea_session import SecretRef1 as SecretRefWhole from renku_data_services.notebooks.cr_amalthea_session import Spec as AmaltheaSessionSpec from renku_data_services.notebooks.cr_amalthea_session import Type as AuthenticationType from renku_data_services.notebooks.cr_amalthea_session import Type1 as CodeRepositoryType @@ -167,7 +170,7 @@ def as_apispec(self) -> apispec.SessionResponse: url = "None" if self.status.url is None or self.status.url == "" or self.status.url.lower() == "None": if self.spec is not None and self.spec.ingress is not None: - scheme = "https" if self.spec.ingress.tlsSecretName is not None else "http" + scheme = "https" if self.spec.ingress.tlsSecret is not None else "http" url = urljoin(f"{scheme}://{self.spec.ingress.host}", self.spec.session.urlPath) else: url = self.status.url