diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b1e4ed5..bee4c9b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,19 @@ Changelog ========= +0.17.0 (DEVELOPMENT) +==================== + +**💥⚠️ Breaking changes** + +* Removed the ``generate_digid_metadata``, ``generate_eherkenning_metadata`` and + ``generate_eherkenning_dienstcatalogus`` management commands. This metadata is + available through the admin interface and existing URLs/views. + +**Features** + +... + 0.16.0 (2024-07-02) =================== diff --git a/digid_eherkenning/admin.py b/digid_eherkenning/admin.py index f7fdf5f..aa7d3d4 100644 --- a/digid_eherkenning/admin.py +++ b/digid_eherkenning/admin.py @@ -1,11 +1,16 @@ +from datetime import datetime + from django.contrib import admin +from django.urls import reverse +from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ from privates.admin import PrivateMediaMixin from privates.widgets import PrivateFileWidget from solo.admin import SingletonModelAdmin -from .models import DigidConfiguration, EherkenningConfiguration +from .models import ConfigCertificate, DigidConfiguration, EherkenningConfiguration +from .models.base import BaseConfiguration class CustomPrivateFileWidget(PrivateFileWidget): @@ -16,14 +21,39 @@ class CustomPrivateMediaMixin(PrivateMediaMixin): private_media_file_widget = CustomPrivateFileWidget -@admin.register(DigidConfiguration) -class DigidConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin): - readonly_fields = ("idp_service_entity_id",) - fieldsets = ( +class BaseAdmin(CustomPrivateMediaMixin, SingletonModelAdmin): + readonly_fields = ( + "link_to_certificates", + "idp_service_entity_id", + ) + private_media_fields = ("idp_metadata_file",) + + @admin.display(description=_("certificates")) + def link_to_certificates(self, obj: BaseConfiguration) -> str: + path = reverse( + "admin:digid_eherkenning_configcertificate_changelist", + current_app=self.admin_site.name, + ) + config_type = obj._as_config_type() + qs = ConfigCertificate.objects.filter(config_type=config_type) + url = f"{path}?config_type__exact={config_type}" + return format_html( + '{label}', + url=url, + config_type=config_type.value, + label=_("Manage ({count})").format(count=qs.count()), + ) + + +def _fieldset_factory(middle): + """ + Output custom fieldsets (model-specific) between fixed shared field(set)s. + """ + head = [ ( _("X.509 Certificate"), { - "fields": ("certificate",), + "fields": ("link_to_certificates",), }, ), ( @@ -50,18 +80,8 @@ class DigidConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin): ), }, ), - ( - _("Service details"), - { - "fields": ( - "service_name", - "service_description", - "requested_attributes", - "attribute_consuming_service_index", - "slo", - ), - }, - ), + ] + tail = [ ( _("Organization details"), { @@ -73,96 +93,103 @@ class DigidConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin): ), }, ), + ] + + return tuple(head + list(middle) + tail) + + +@admin.register(DigidConfiguration) +class DigidConfigurationAdmin(BaseAdmin): + fieldsets = _fieldset_factory( + [ + ( + _("Service details"), + { + "fields": ( + "service_name", + "service_description", + "requested_attributes", + "attribute_consuming_service_index", + "slo", + ), + }, + ), + ] ) change_form_template = "admin/digid_eherkenning/digidconfiguration/change_form.html" - private_media_fields = ("idp_metadata_file",) @admin.register(EherkenningConfiguration) -class EherkenningConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin): - readonly_fields = ("idp_service_entity_id",) - fieldsets = ( - ( - _("X.509 Certificate"), - { - "fields": ("certificate",), - }, - ), - ( - _("Identity provider"), - { - "fields": ( - "metadata_file_source", - "idp_service_entity_id", - "idp_metadata_file", - ), - }, - ), - ( - _("SAML configuration"), - { - "fields": ( - "entity_id", - "base_url", - "artifact_resolve_content_type", - "want_assertions_signed", - "want_assertions_encrypted", - "signature_algorithm", - "digest_algorithm", - ), - }, - ), - ( - _("Service details"), - { - "fields": ( - "service_name", - "service_description", - "oin", - "makelaar_id", - "privacy_policy", - "service_language", - ), - }, - ), - ( - _("eHerkenning"), - { - "fields": ( - "eh_requested_attributes", - "eh_attribute_consuming_service_index", - "eh_service_uuid", - "eh_service_instance_uuid", - "eh_loa", - ), - }, - ), - ( - _("eIDAS"), - { - "fields": ( - "no_eidas", - "eidas_requested_attributes", - "eidas_attribute_consuming_service_index", - "eidas_service_uuid", - "eidas_service_instance_uuid", - "eidas_loa", - ), - }, - ), - ( - _("Organization details"), - { - "fields": ( - "technical_contact_person_telephone", - "technical_contact_person_email", - "organization_url", - "organization_name", - ), - }, - ), +class EherkenningConfigurationAdmin(BaseAdmin): + fieldsets = _fieldset_factory( + [ + ( + _("Service details"), + { + "fields": ( + "service_name", + "service_description", + "oin", + "makelaar_id", + "privacy_policy", + "service_language", + ), + }, + ), + ( + _("eHerkenning"), + { + "fields": ( + "eh_requested_attributes", + "eh_attribute_consuming_service_index", + "eh_service_uuid", + "eh_service_instance_uuid", + "eh_loa", + ), + }, + ), + ( + _("eIDAS"), + { + "fields": ( + "no_eidas", + "eidas_requested_attributes", + "eidas_attribute_consuming_service_index", + "eidas_service_uuid", + "eidas_service_instance_uuid", + "eidas_loa", + ), + }, + ), + ] ) + change_form_template = ( "admin/digid_eherkenning/eherkenningconfiguration/change_form.html" ) - private_media_fields = ("idp_metadata_file",) + + +@admin.register(ConfigCertificate) +class ConfigCertificateAdmin(admin.ModelAdmin): + list_display = ( + "config_type", + "certificate", + "valid_from", + "expiry_date", + "is_ready", + ) + list_filter = ("config_type",) + search_fields = ("certificate__label",) + raw_id_fields = ("certificate",) + + @admin.display(description=_("valid from")) + def valid_from(self, obj: ConfigCertificate) -> datetime: + return obj.certificate.valid_from + + @admin.display(description=_("expires on")) + def expiry_date(self, obj: ConfigCertificate) -> datetime: + return obj.certificate.expiry_date + + @admin.display(description=_("valid candidate?"), boolean=True) + def is_ready(self, obj: ConfigCertificate) -> bool: + return obj.is_ready_for_authn_requests diff --git a/digid_eherkenning/choices.py b/digid_eherkenning/choices.py index 60960c1..d2f9dda 100644 --- a/digid_eherkenning/choices.py +++ b/digid_eherkenning/choices.py @@ -4,6 +4,15 @@ from onelogin.saml2.constants import OneLogin_Saml2_Constants +class ConfigTypes(models.TextChoices): + """ + Maps a config type enum to a configuration model. + """ + + digid = "digid_eherkenning.DigidConfiguration", _("DigiD") + eherkenning = "digid_eherkenning.EherkenningConfiguration", _("eHerkenning") + + class SectorType(models.TextChoices): bsn = "s00000000", "BSN" sofi = "s00000001", "SOFI" diff --git a/digid_eherkenning/exceptions.py b/digid_eherkenning/exceptions.py index f372f00..b04d372 100644 --- a/digid_eherkenning/exceptions.py +++ b/digid_eherkenning/exceptions.py @@ -8,3 +8,9 @@ class eHerkenningError(SAML2Error): class eHerkenningNoRSINError(eHerkenningError): pass + + +class CertificateProblem(Exception): + def __init__(self, msg: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.message = msg diff --git a/digid_eherkenning/management/commands/_base.py b/digid_eherkenning/management/commands/_base.py deleted file mode 100644 index 9aa7c14..0000000 --- a/digid_eherkenning/management/commands/_base.py +++ /dev/null @@ -1,226 +0,0 @@ -from pathlib import Path -from typing import Sequence, Type - -from django.core.files import File -from django.core.management.base import BaseCommand -from django.db import transaction - -from simple_certmanager.constants import CertificateTypes -from simple_certmanager.models import Certificate - -from ...models.base import BaseConfiguration - -try: - from argparse import BooleanOptionalAction -except ImportError: - from ..utils import BooleanOptionalAction - - -class SamlMetadataBaseCommand(BaseCommand): - config_model: Type[BaseConfiguration] - default_certificate_label: str - - def add_arguments(self, parser): - """ - Add arguments that map to configuration model fields. - - You can use a different flag, but then ensure the ``dest`` kwarg is specified. - Options are applied to the specified configuration model instance if a model - field with the same name as the option exists. - """ - # check current config to determine if an option is required or not - config = self._get_config() - has_private_key = config.certificate and config.certificate.private_key - has_certificate = config.certificate and config.certificate.public_certificate - - parser.add_argument( - "--want-assertions-encrypted", - action="store_true", - help="If True the XML assertions need to be encrypted. Defaults to False", - ) - parser.add_argument( - "--no-only-assertions-signed", - dest="want_assertions_signed", - action="store_false", - help=( - "If True, the XML assertions need to be signed, otherwise the whole " - "response needs to be signed. Defaults to only assertions signed." - ), - ) - parser.add_argument( - "--key-file", - required=not has_private_key, - help=( - "The filepath to the TLS key. This will be used both by the SOAP " - "client and for signing the requests." - ), - ) - parser.add_argument( - "--cert-file", - required=not has_certificate, - help=( - "The filepath to the TLS certificate. This will be used both by the " - "SOAP client and for signing the requests." - ), - ) - parser.add_argument( - "--key-passphrase", - help="Passphrase for SOAP client", - default=None, - ) - parser.add_argument( - "--signature-algorithm", - help="Signature algorithm, defaults to RSA_SHA1", - default="http://www.w3.org/2000/09/xmldsig#rsa-sha1", - ) - parser.add_argument( - "--digest-algorithm", - help="Digest algorithm, defaults to SHA1", - default="http://www.w3.org/2000/09/xmldsig#sha1", - ) - parser.add_argument( - "--entity-id", - required=not config.entity_id, - help="Service provider entity ID", - ) - parser.add_argument( - "--base-url", - required=not config.base_url, - help="Base URL of the application", - ) - parser.add_argument( - "--service-name", - required=not config.service_name, - help="The name of the service for which DigiD login is required", - ) - parser.add_argument( - "--service-description", - required=not config.service_description, - help="A description of the service for which DigiD login is required", - ) - parser.add_argument( - "--technical-contact-person-telephone", - help=( - "Telephone number of the technical person responsible for this DigiD " - "setup. For it to be used, --technical-contact-person-email should " - "also be set." - ), - ) - parser.add_argument( - "--technical-contact-person-email", - help=( - "Email address of the technical person responsible for this DigiD " - "setup. For it to be used, --technical-contact-person-telephone " - "should also be set." - ), - ) - parser.add_argument( - "--organization-name", - help=( - "Name of the organisation providing the service for which DigiD login " - "is setup. For it to be used, also --organization-url should be filled." - ), - ) - parser.add_argument( - "--organization-url", - help=( - "URL of the organisation providing the service for which DigiD login " - "is setup. For it to be used, also --organization-name should be " - "filled." - ), - ) - parser.add_argument( - "--output-file", - help=( - "Name of the file to which to write the metadata. Otherwise will be " - "printed on stdout" - ), - ) - parser.add_argument( - "--test", - "--debug", - action="store_true", - help="If True the metadata is printed to stdout. Defaults to False", - ) - parser.add_argument( - "--save-config", - action=BooleanOptionalAction, - required=True, - help="Save the configuration overrides specified via the command line.", - ) - - def get_filename(self) -> str: # pragma:nocover - raise NotImplementedError - - def generate_metadata(self, options: dict) -> bytes: # pragma:nocover - raise NotImplementedError - - def _get_config(self): - if not hasattr(self, "_config"): - self._config = self.config_model.get_solo() - return self._config - - def _set_certificate(self, config: BaseConfiguration, options: dict): - certificate = config.certificate - - # no certificate exists yet -> create one - if certificate is None: - certificate = Certificate.objects.create( - label=self.default_certificate_label, - type=CertificateTypes.key_pair, - ) - config.certificate = certificate - - # enforce that the specified key/certificate are used - for option, filefield in ( - ("key_file", "private_key"), - ("cert_file", "public_certificate"), - ): - filepath = options[option] - if not filepath: - continue - - path = Path(filepath) - with path.open("rb") as infile: - field_file = getattr(certificate, filefield) - field_file.save(path.name, File(infile), save=False) - - certificate.save() - - @transaction.atomic - def _generate_metadata(self, options: dict) -> bytes: - valid_field_names = [f.name for f in self.config_model._meta.get_fields()] - config = self._get_config() - - self._set_certificate(config, options) - - for key, value in options.items(): - if key not in valid_field_names: - continue - # optional, unspecified -> go with the model default or current value - if value is None: - continue - setattr(config, key, value) - - config.save() - - metadata = self.generate_metadata(options) - - transaction.set_rollback(not options["save_config"]) - - return metadata - - def handle(self, *args, **options): - metadata_content = self._generate_metadata(options) - - if options["test"]: - self.stdout.write(metadata_content.decode("utf-8")) - return - - filename = options["output_file"] or self.get_filename() - with open(filename, "xb") as metadata_file: - metadata_file.write(metadata_content) - - self.stdout.write( - self.style.SUCCESS("Metadata file successfully generated: %s" % filename) - ) diff --git a/digid_eherkenning/management/commands/generate_digid_metadata.py b/digid_eherkenning/management/commands/generate_digid_metadata.py deleted file mode 100644 index 3353031..0000000 --- a/digid_eherkenning/management/commands/generate_digid_metadata.py +++ /dev/null @@ -1,41 +0,0 @@ -from django.utils import timezone - -from ...models import DigidConfiguration -from ...saml2.digid import generate_digid_metadata -from ._base import SamlMetadataBaseCommand - -try: - from argparse import BooleanOptionalAction -except ImportError: - from ..utils import BooleanOptionalAction - - -class Command(SamlMetadataBaseCommand): - help = "Create the DigiD metadata file" - config_model = DigidConfiguration - default_certificate_label = "DigiD" - - def add_arguments(self, parser): - super().add_arguments(parser) - - config: DigidConfiguration = self._get_config() - - parser.add_argument( - "--slo", - default=config.slo, - action=BooleanOptionalAction, - help="If '--slo' is present, Single Logout is supported. To turn it off use '--no-slo'", - ) - parser.add_argument( - "--attribute-consuming-service-index", - type=str, - help="Attribute consuming service index, defaults to 1", - default="1", - ) - - def get_filename(self): - date_string = timezone.now().date().isoformat() - return f"digid-metadata-{date_string}.xml" - - def generate_metadata(self, options): - return generate_digid_metadata() diff --git a/digid_eherkenning/management/commands/generate_eherkenning_dienstcatalogus.py b/digid_eherkenning/management/commands/generate_eherkenning_dienstcatalogus.py deleted file mode 100644 index 60a7a29..0000000 --- a/digid_eherkenning/management/commands/generate_eherkenning_dienstcatalogus.py +++ /dev/null @@ -1,63 +0,0 @@ -from django.utils import timezone - -from ...models import EherkenningConfiguration -from ...saml2.eherkenning import generate_dienst_catalogus_metadata -from .generate_eherkenning_metadata import Command as EherkenningCommand - - -def _remove_action_by_dest(parser, dest: str): - for action in parser._actions: - if action.dest != dest: - continue - parser._remove_action(action) - break - - for action in parser._action_groups: - for group_action in action._group_actions: - if group_action.dest != dest: - continue - action._group_actions.remove(group_action) - return - - -class Command(EherkenningCommand): - help = "Create the eHerkenning dienstcatalogus file" - - def add_arguments(self, parser): - super().add_arguments(parser) - - # delete arguments that we don't use - dests_to_delete = [ - "want_assertions_encrypted", - "want_assertions_signed", - "technical_contact_person_telephone", - "technical_contact_person_email", - "organization_url", - ] - # remove actions not relevant for this command, but still re-use the bulk - # from the eherkenning metadata generation command - for dest in dests_to_delete: - _remove_action_by_dest(parser, dest) - - config: EherkenningConfiguration = self._get_config() - - parser.add_argument( - "--privacy-policy", - required=not config.privacy_policy, - help=( - "The URL where the privacy policy from the organisation providing the " - "service can be found." - ), - ) - parser.add_argument( - "--makelaar-id", - required=not config.makelaar_id, - help="OIN of the broker used to set up eHerkenning/eIDAS.", - ) - - def get_filename(self): - date_string = timezone.now().date().isoformat() - return f"eherkenning-dienstcatalogus-{date_string}.xml" - - def generate_metadata(self, options): - return generate_dienst_catalogus_metadata() diff --git a/digid_eherkenning/management/commands/generate_eherkenning_metadata.py b/digid_eherkenning/management/commands/generate_eherkenning_metadata.py deleted file mode 100644 index 9995ed7..0000000 --- a/digid_eherkenning/management/commands/generate_eherkenning_metadata.py +++ /dev/null @@ -1,51 +0,0 @@ -from django.utils import timezone - -from ...models import EherkenningConfiguration -from ...saml2.eherkenning import generate_eherkenning_metadata -from ._base import SamlMetadataBaseCommand - - -class Command(SamlMetadataBaseCommand): - help = "Create the eHerkenning metadata file" - config_model = EherkenningConfiguration - default_certificate_label = "eHerkenning/eIDAS" - - def add_arguments(self, parser): - super().add_arguments(parser) - - config: EherkenningConfiguration = self._get_config() - - parser.add_argument( - "--loa", - help="Level of Assurance (LoA) to use for all the services.", - default="urn:etoegang:core:assurance-class:loa3", - ) - parser.add_argument( - "--eh-attribute-consuming-service-index", - help="Attribute consuming service index for the eHerkenning service, defaults to 9052", - default="9052", - ) - parser.add_argument( - "--eidas-attribute-consuming-service-index", - help="Attribute consuming service index for the eHerkenning service, defaults to 9053", - default="9053", - ) - parser.add_argument( - "--oin", - required=not config.oin, - default=config.oin, - help="The OIN of the company providing the service.", - ) - parser.add_argument( - "--no-eidas", - action="store_true", - help="If True, then the service catalogue will contain only the eHerkenning service. Defaults to False.", - default=False, - ) - - def get_filename(self): - date_string = timezone.now().date().isoformat() - return f"eherkenning-metadata-{date_string}.xml" - - def generate_metadata(self, options): - return generate_eherkenning_metadata() diff --git a/digid_eherkenning/management/commands/update_stored_metadata.py b/digid_eherkenning/management/commands/update_stored_metadata.py index 78f3831..4c09518 100644 --- a/digid_eherkenning/management/commands/update_stored_metadata.py +++ b/digid_eherkenning/management/commands/update_stored_metadata.py @@ -1,7 +1,11 @@ from django.core.management import BaseCommand -from digid_eherkenning.models.digid import DigidConfiguration -from digid_eherkenning.models.eherkenning import EherkenningConfiguration +from ...models import DigidConfiguration, EherkenningConfiguration + +MODEL_MAP = { + "digid": DigidConfiguration, + "eherkenning": EherkenningConfiguration, +} class Command(BaseCommand): @@ -11,15 +15,13 @@ def add_arguments(self, parser): parser.add_argument( "config_model", type=str, - choices=["digid", "eherkenning"], + choices=list(MODEL_MAP.keys()), help="Update the DigiD or Eherkenning configuration metadata.", ) def handle(self, **options): - if options["config_model"] == "digid": - config = DigidConfiguration.get_solo() - elif options["config_model"] == "eherkenning": - config = EherkenningConfiguration.get_solo() + config_model = MODEL_MAP[options["config_model"]] + config = config_model.get_solo() if config.metadata_file_source: config.save(force_metadata_update=True) diff --git a/digid_eherkenning/migrations/0011_configcertificate_configcertificate_uniq_config_cert.py b/digid_eherkenning/migrations/0011_configcertificate_configcertificate_uniq_config_cert.py new file mode 100644 index 0000000..932e468 --- /dev/null +++ b/digid_eherkenning/migrations/0011_configcertificate_configcertificate_uniq_config_cert.py @@ -0,0 +1,65 @@ +# Generated by Django 4.2.13 on 2024-07-19 10:53 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("simple_certmanager", "0002_alter_certificate_private_key_and_more"), + ("digid_eherkenning", "0010_remove_digidconfiguration_key_passphrase_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="ConfigCertificate", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "config_type", + models.CharField( + choices=[ + ("digid_eherkenning.DigidConfiguration", "DigiD"), + ( + "digid_eherkenning.EherkenningConfiguration", + "eHerkenning", + ), + ], + max_length=100, + verbose_name="config type", + ), + ), + ( + "certificate", + models.ForeignKey( + help_text="Certificate that may be used by the specified configuration. The best matching candidate will automatically be selected by the configuration.", + limit_choices_to={"type": "key_pair"}, + on_delete=django.db.models.deletion.PROTECT, + to="simple_certmanager.certificate", + verbose_name="certificate", + ), + ), + ], + options={ + "verbose_name": "DigiD/eHerkenning certificate", + "verbose_name_plural": "DigiD/eHerkenning certificates", + }, + ), + migrations.AddConstraint( + model_name="configcertificate", + constraint=models.UniqueConstraint( + fields=("config_type", "certificate"), + name="uniq_config_cert", + violation_error_message="This configuration and certificate combination already exists.", + ), + ), + ] diff --git a/digid_eherkenning/migrations/0012_move_config_certificate.py b/digid_eherkenning/migrations/0012_move_config_certificate.py new file mode 100644 index 0000000..3e1231e --- /dev/null +++ b/digid_eherkenning/migrations/0012_move_config_certificate.py @@ -0,0 +1,47 @@ +# Generated by Django 4.2.13 on 2024-07-19 12:34 + +from django.db import migrations + +from ..choices import ConfigTypes + + +def move_certificates(apps, _): + DigidConfiguration = apps.get_model("digid_eherkenning", "DigidConfiguration") + EherkenningConfiguration = apps.get_model( + "digid_eherkenning", "EherkenningConfiguration" + ) + ConfigCertificate = apps.get_model("digid_eherkenning", "ConfigCertificate") + + for model in (DigidConfiguration, EherkenningConfiguration): + config = model.objects.first() + if config is None or not (cert := config.certificate): + continue + ConfigCertificate.objects.get_or_create( + certificate=cert, + config_type=ConfigTypes( + f"{config._meta.app_label}.{config._meta.object_name}" + ), + ) + + +class Migration(migrations.Migration): + + dependencies = [ + ( + "digid_eherkenning", + "0011_configcertificate_configcertificate_uniq_config_cert", + ), + ] + + operations = [ + # reverse migration is ambiguous, if needed, you can easily use the UI + migrations.RunPython(move_certificates, migrations.RunPython.noop), + migrations.RemoveField( + model_name="digidconfiguration", + name="certificate", + ), + migrations.RemoveField( + model_name="eherkenningconfiguration", + name="certificate", + ), + ] diff --git a/digid_eherkenning/models/__init__.py b/digid_eherkenning/models/__init__.py index 31dfd85..b46fd90 100644 --- a/digid_eherkenning/models/__init__.py +++ b/digid_eherkenning/models/__init__.py @@ -1,4 +1,9 @@ +from .certificates import ConfigCertificate from .digid import DigidConfiguration from .eherkenning import EherkenningConfiguration -__all__ = ["DigidConfiguration", "EherkenningConfiguration"] +__all__ = [ + "DigidConfiguration", + "EherkenningConfiguration", + "ConfigCertificate", +] diff --git a/digid_eherkenning/models/base.py b/digid_eherkenning/models/base.py index 75bc4c3..ddda5ca 100644 --- a/digid_eherkenning/models/base.py +++ b/digid_eherkenning/models/base.py @@ -10,26 +10,17 @@ from simple_certmanager.models import Certificate from solo.models import SingletonModel -from ..choices import DigestAlgorithms, SignatureAlgorithms, XMLContentTypes - - -class ConfigurationManager(models.Manager): - def get_queryset(self): - qs = super().get_queryset() - return qs.select_related("certificate") +from ..choices import ( + ConfigTypes, + DigestAlgorithms, + SignatureAlgorithms, + XMLContentTypes, +) +from ..exceptions import CertificateProblem +from .certificates import ConfigCertificate class BaseConfiguration(SingletonModel): - certificate = models.ForeignKey( - Certificate, - null=True, - on_delete=models.PROTECT, - verbose_name=_("key pair"), - help_text=_( - "The private key and public certificate pair to use during the " - "authentication flow." - ), - ) idp_metadata_file = PrivateMediaFileField( _("identity provider metadata"), blank=True, @@ -167,8 +158,6 @@ class BaseConfiguration(SingletonModel): max_length=100, ) - objects = ConfigurationManager() - class Meta: abstract = True @@ -240,6 +229,34 @@ def save(self, *args, **kwargs): super().save(*args, **kwargs) def clean(self): - if not self.certificate: - raise ValidationError(_("You must select a certificate")) super().clean() + + # require that a certificate is configured + if not ConfigCertificate.objects.for_config(self).exists(): + raise ValidationError( + _( + "You must prepare at least one certificate for the {verbose_name}." + ).format(verbose_name=self._meta.verbose_name) + ) + + @classmethod + def _as_config_type(cls) -> ConfigTypes: + opts = cls._meta + return ConfigTypes(f"{opts.app_label}.{opts.object_name}") + + def select_certificates(self) -> tuple[Certificate, Certificate | None]: + try: + current_cert, next_cert = ConfigCertificate.objects.for_config( + self + ).select_certificates() + except ConfigCertificate.DoesNotExist as exc: + raise CertificateProblem( + "No (valid) certificate configured. The configuration needs a " + "certificate with private key and public certificate." + ) from exc + else: + # type checker shanigans, mostly + assert isinstance(current_cert, Certificate) + assert next_cert is None or isinstance(next_cert, Certificate) + + return current_cert, next_cert diff --git a/digid_eherkenning/models/certificates.py b/digid_eherkenning/models/certificates.py new file mode 100644 index 0000000..af6df00 --- /dev/null +++ b/digid_eherkenning/models/certificates.py @@ -0,0 +1,201 @@ +""" +Functionality to relate one or more certificates to a SAMLv2 configuration. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, TypeAlias + +from django.db import models +from django.utils import timezone +from django.utils.translation import gettext_lazy as _ + +from simple_certmanager.constants import CertificateTypes +from simple_certmanager.models import Certificate + +from ..choices import ConfigTypes + +if TYPE_CHECKING: + from .digid import DigidConfiguration + from .eherkenning import EherkenningConfiguration + +logger = logging.getLogger(__name__) + +_AnyDigiD: TypeAlias = "type[DigidConfiguration] | DigidConfiguration" +_AnyEH: TypeAlias = "type[EherkenningConfiguration] | EherkenningConfiguration" + + +class ConfigCertificateQuerySet(models.QuerySet["ConfigCertificate"]): + def for_config(self, config: _AnyDigiD | _AnyEH): + config_type = config._as_config_type() + return self.filter(config_type=config_type) + + def select_certificates(self) -> tuple[Certificate, Certificate | None]: + """ + Select the best candidates for the current and next certificate. + + The certificates are used for signing authentication requests (and metadata) + itself, and the possible certificates that perform signing are included in the + metadata. For zero-downtime/gradual replacement, a current and next certificate + can be provided (this is a limitation in python3-saml). + + We look for the current certificate and the next with the following algorithm: + + * order candidates by valid_from, so we favour existing/the oldest keypairs + * order candidates by expiry date, so if they have an identical valid_from, we + favour the one that will expiry first (the other one(s) automatically become + the next certificate + * discard any candidates that do not meet our key pair requirements, ignoring + valid_from/until + + To determine the current certificate: + + * discard candidates that are not valid yet + * discard candiates that are not valid anymore + + If no candidate matches, we raise a DoesNotExist exception. + + If a candidate is found, we select the next certificate according to: + + * must be valid_from >= current_certificate.valid_from + * must not be expired + """ + # XXX: check if this has a big performance impact because we extract the + # valid_from/until by loading the certificate files! + + qs = self.filter(certificate__type=CertificateTypes.key_pair).iterator() + # first pass - filter out anything that is not usable for SAML flows ( + # discarding broken/invalid configurations) + candidates = [ + candidate + for candidate in qs + if candidate._meets_requirements_to_be_used_for_saml() + ] + # sort them - we now know that we can safely access the valid_from and + # expiry_date attributes + candidates = sorted( + candidates, + key=lambda c: (c.certificate.valid_from, c.certificate.expiry_date), + ) + + # figure out which certificate is our current certificate + current_cert: Certificate | None = None + next_cert: Certificate | None = None + + # loop only once, so that we are certain next_cert's validity is *after* current + # cert. + for candidate in candidates: + certificate: Certificate = candidate.certificate + match (current_cert, next_cert): + case (None, None) if candidate.is_ready_for_authn_requests: + current_cert = certificate + continue # the same candidate cannot both be current and next + case (Certificate(), None) if certificate.expiry_date > timezone.now(): + next_cert = certificate + break # we found both current and next + else: + logger.debug("Could not determine a next certificate") + + if current_cert is None: + raise self.model.DoesNotExist( + "Could not find a suitable current certificate" + ) + + return current_cert, next_cert + + +class ConfigCertificateManager(models.Manager.from_queryset(ConfigCertificateQuerySet)): + def get_queryset(self): + qs = super().get_queryset() + return qs.select_related("certificate") + + +class ConfigCertificate(models.Model): + """ + Tie a particular certificate to a configuration model. + """ + + config_type = models.CharField( + _("config type"), + max_length=100, + choices=ConfigTypes.choices, + ) + certificate = models.ForeignKey( + Certificate, + on_delete=models.PROTECT, + # Careful! This does not give any guarantees, you can select a valid certificate + # and then make the certificate instance itself invalid, and end up with a + # cert-only configuration. + limit_choices_to={"type": CertificateTypes.key_pair}, + verbose_name=_("certificate"), + help_text=_( + "Certificate that may be used by the specified configuration. The best " + "matching candidate will automatically be selected by the configuration." + ), + ) + + objects = ConfigCertificateManager() + + class Meta: + verbose_name = _("DigiD/eHerkenning certificate") + verbose_name_plural = _("DigiD/eHerkenning certificates") + constraints = [ + models.UniqueConstraint( + name="uniq_config_cert", + fields=("config_type", "certificate"), + violation_error_message=_( + "This configuration and certificate combination already exists." + ), + ), + ] + + def __str__(self): + config_type = self.get_config_type_display() # type: ignore + _cert = self.certificate if self.certificate_id else None # type: ignore + certificate = str(_cert) if _cert else _("(no certificate selected)") + return f"{config_type}: {certificate}" + + def _meets_requirements_to_be_used_for_saml(self) -> bool: + try: + _certificate: Certificate = self.certificate + except Certificate.DoesNotExist: + return False + + if _certificate.type != CertificateTypes.key_pair: + return False + + if not (privkey := _certificate.private_key) or not privkey.storage.exists( + privkey.name + ): + return False + + # Try loading it with cryptography + try: + _certificate.certificate + except (FileNotFoundError, ValueError) as exc: + logger.info( + "Could not introspect certificate validity", + exc_info=exc, + extra={"certificate_pk": _certificate.pk}, + ) + return False + + return True + + @property + def is_ready_for_authn_requests(self) -> bool: + """ + Introspect the certificate to determine if it's a candidate for authn requests. + """ + if not self._meets_requirements_to_be_used_for_saml(): + return False + + _certificate: Certificate = self.certificate + valid_from, expiry_date = _certificate.valid_from, _certificate.expiry_date + + now = timezone.now() + if not (valid_from <= now <= expiry_date): + return False + + return True diff --git a/digid_eherkenning/models/digid.py b/digid_eherkenning/models/digid.py index 53166fc..66eadc9 100644 --- a/digid_eherkenning/models/digid.py +++ b/digid_eherkenning/models/digid.py @@ -1,4 +1,3 @@ -from django.core.exceptions import ImproperlyConfigured from django.db import models from django.utils.translation import gettext_lazy as _ @@ -45,15 +44,7 @@ def as_dict(self) -> dict: """ organization = None - if ( - not self.certificate - or not self.certificate.private_key - or not self.certificate.public_certificate - ): - raise ImproperlyConfigured( - "No (valid) certificate configured. The configuration needs a " - "certificate with private key and public certificate." - ) + current_cert, next_cert = self.select_certificates() if self.organization_url and self.organization_name: organization = { @@ -68,8 +59,9 @@ def as_dict(self) -> dict: "base_url": self.base_url, "entity_id": self.entity_id, "metadata_file": self.idp_metadata_file, - "key_file": self.certificate.private_key, - "cert_file": self.certificate.public_certificate, + "key_file": current_cert.private_key, + "cert_file": current_cert.public_certificate, + "next_cert_file": next_cert.public_certificate if next_cert else None, "service_entity_id": self.idp_service_entity_id, "attribute_consuming_service_index": self.attribute_consuming_service_index, "service_name": self.service_name, diff --git a/digid_eherkenning/models/eherkenning.py b/digid_eherkenning/models/eherkenning.py index 15c10ef..e67cda1 100644 --- a/digid_eherkenning/models/eherkenning.py +++ b/digid_eherkenning/models/eherkenning.py @@ -1,6 +1,5 @@ import uuid -from django.core.exceptions import ImproperlyConfigured from django.db import models from django.utils.translation import gettext_lazy as _ @@ -198,15 +197,7 @@ def as_dict(self) -> EHerkenningConfig: """ organization = None - if ( - not self.certificate - or not self.certificate.private_key - or not self.certificate.public_certificate - ): - raise ImproperlyConfigured( - "No (valid) certificate configured. The configuration needs a " - "certificate with private key and public certificate." - ) + current_cert, next_cert = self.select_certificates() if self.organization_url and self.organization_name: organization = { @@ -279,8 +270,9 @@ def as_dict(self) -> EHerkenningConfig: "base_url": self.base_url, "entity_id": self.entity_id, "metadata_file": self.idp_metadata_file, - "key_file": self.certificate.private_key, - "cert_file": self.certificate.public_certificate, + "key_file": current_cert.private_key, + "cert_file": current_cert.public_certificate, + "next_cert_file": next_cert.public_certificate if next_cert else None, "service_entity_id": self.idp_service_entity_id, "oin": self.oin, "services": services, diff --git a/digid_eherkenning/saml2/base.py b/digid_eherkenning/saml2/base.py index de5d74c..15baa93 100644 --- a/digid_eherkenning/saml2/base.py +++ b/digid_eherkenning/saml2/base.py @@ -280,11 +280,23 @@ def create_config_dict(self, conf): ], }, "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified", + # Used for: + # * signing the metadata + # * signing authentication requests "x509cert": certificate, + # Used for: + # * signing the metadata + # * signing authentication requests "privateKey": privkey, }, } + # Used to provide the next certificate to be used for signing in the + # metadata so that the IDP can prepare. + if next_cert_file := conf.get("next_cert_file"): + with next_cert_file.open("r") as _next_cert_file: + setting_dict["sp"]["x509certNew"] = _next_cert_file.read() + # check if we need to add the idp metadata_file = conf["metadata_file"] if metadata_file: diff --git a/digid_eherkenning/saml2/eherkenning.py b/digid_eherkenning/saml2/eherkenning.py index 57f48da..5f5f96a 100644 --- a/digid_eherkenning/saml2/eherkenning.py +++ b/digid_eherkenning/saml2/eherkenning.py @@ -1,7 +1,7 @@ import binascii from base64 import b64encode from io import BytesIO -from typing import Union +from typing import Union, no_type_check from uuid import uuid4 from django.urls import reverse @@ -9,7 +9,6 @@ from cryptography.hazmat.primitives import serialization from cryptography.x509 import load_pem_x509_certificate -from furl.furl import furl from lxml.builder import ElementMaker from lxml.etree import Element, tostring from onelogin.saml2.settings import OneLogin_Saml2_Settings @@ -465,35 +464,17 @@ def conf(self) -> EHerkenningConfig: self._conf.setdefault("acs_path", reverse("eherkenning:acs")) return self._conf + @no_type_check # my editor has more red than the red wedding in GOT def create_config_dict(self, conf: EHerkenningConfig) -> EHerkenningSAMLConfig: config_dict: EHerkenningSAMLConfig = super().create_config_dict(conf) + sp_config = config_dict["sp"] + + # we have multiple services, so delete the config for the "single service" variant attribute_consuming_services = create_attribute_consuming_services(conf) - with ( - conf["cert_file"].open("r") as cert_file, - conf["key_file"].open("r") as key_file, - ): - certificate = cert_file.read() - privkey = key_file.read() - acs_url = furl(conf["base_url"]) / conf["acs_path"] - config_dict.update( - { - "sp": { - # Identifier of the SP entity (must be a URI) - "entityId": conf["entity_id"], - # Specifies info about where and how the message MUST be - # returned to the requester, in this case our SP. - "assertionConsumerService": { - "url": acs_url.url, - "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact", - }, - "attributeConsumingServices": attribute_consuming_services, - "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified", - "x509cert": certificate, - "privateKey": privkey, - }, - } - ) + del sp_config["attributeConsumingService"] + sp_config["attributeConsumingServices"] = attribute_consuming_services + return config_dict def create_config( diff --git a/digid_eherkenning/types.py b/digid_eherkenning/types.py index c8d12b8..36ef8c8 100644 --- a/digid_eherkenning/types.py +++ b/digid_eherkenning/types.py @@ -1,6 +1,8 @@ from pathlib import Path from typing import Optional, TypedDict, Union +from django.db.models.fields.files import FieldFile + class ServiceConfig(TypedDict): service_uuid: str @@ -25,8 +27,8 @@ class EHerkenningConfig(TypedDict): acs_path: str entity_id: str metadata_file: str - cert_file: Path - key_file: Path + cert_file: Path | FieldFile + key_file: Path | FieldFile service_entity_id: str oin: str services: list[ServiceConfig] diff --git a/pyproject.toml b/pyproject.toml index ee015c5..59e0bff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "cryptography>=40.0.0", "django>=4.2.0", "django-sessionprofile", - "django-simple-certmanager>=2.2.0", + "django-simple-certmanager>=2.3.0", "django-solo", "lxml>=4.7.1", "furl", diff --git a/tests/conftest.py b/tests/conftest.py index 32c0bfc..3964af4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,13 +1,21 @@ +from io import BytesIO from pathlib import Path from django.core.files import File import pytest import responses +from cryptography.hazmat.primitives.asymmetric import rsa from simple_certmanager.constants import CertificateTypes from simple_certmanager.models import Certificate +from simple_certmanager.test.certificate_generation import key_to_pem -from digid_eherkenning.models import DigidConfiguration, EherkenningConfiguration +from digid_eherkenning.choices import ConfigTypes +from digid_eherkenning.models import ( + ConfigCertificate, + DigidConfiguration, + EherkenningConfiguration, +) BASE_DIR = Path(__file__).parent.resolve() @@ -72,7 +80,7 @@ def digid_certificate(temp_private_root) -> Certificate: DIGID_TEST_KEY_FILE.open("rb") as privkey, DIGID_TEST_CERTIFICATE_FILE.open("rb") as cert, ): - certificate, created = Certificate.objects.get_or_create( + certificate, _ = Certificate.objects.get_or_create( label="DigiD Tests", defaults={"type": CertificateTypes.key_pair}, ) @@ -84,8 +92,11 @@ def digid_certificate(temp_private_root) -> Certificate: @pytest.fixture def digid_config_defaults(digid_certificate, temp_private_root): config = DigidConfiguration.get_solo() - if config.certificate != digid_certificate: - config.certificate = digid_certificate + # set up certificate + ConfigCertificate.objects.filter(config_type=ConfigTypes.digid).delete() + ConfigCertificate.objects.create( + config_type=ConfigTypes.digid, certificate=digid_certificate + ) with DIGID_TEST_METADATA_FILE.open("rb") as metadata_file: config.idp_metadata_file.save("metadata", File(metadata_file), save=False) config.save() @@ -109,7 +120,7 @@ def eherkenning_certificate(temp_private_root) -> Certificate: EHERKENNING_TEST_KEY_FILE.open("rb") as privkey, EHERKENNING_TEST_CERTIFICATE_FILE.open("rb") as cert, ): - certificate, created = Certificate.objects.get_or_create( + certificate, _ = Certificate.objects.get_or_create( label="eHerkenning Tests", defaults={"type": CertificateTypes.key_pair}, ) @@ -121,8 +132,10 @@ def eherkenning_certificate(temp_private_root) -> Certificate: @pytest.fixture def eherkenning_config_defaults(eherkenning_certificate): config = EherkenningConfiguration.get_solo() - if config.certificate != eherkenning_certificate: - config.certificate = eherkenning_certificate + ConfigCertificate.objects.filter(config_type=ConfigTypes.eherkenning).delete() + ConfigCertificate.objects.create( + config_type=ConfigTypes.eherkenning, certificate=eherkenning_certificate + ) with EHERKENNING_TEST_METADATA_FILE.open("rb") as metadata_file: config.idp_metadata_file.save("metadata", File(metadata_file), save=False) config.save() @@ -144,3 +157,19 @@ def eherkenning_config(eherkenning_config_defaults): def mocked_responses(): with responses.RequestsMock() as rsps: yield rsps + + +@pytest.fixture +def next_certificate(leaf_keypair: tuple[rsa.RSAPrivateKey, bytes]) -> Certificate: + """ + Generate a key + certificate pair valid from timezone.now(). + """ + key, cert_pem = leaf_keypair + key_pem = key_to_pem(key) + certificate = Certificate.objects.create( + label="Next certificate", + type=CertificateTypes.key_pair, + public_certificate=File(BytesIO(cert_pem), name="public_certificate.pem"), + private_key=File(BytesIO(key_pem), name="private_key.pem"), + ) + return certificate diff --git a/tests/test_admin.py b/tests/test_admin.py new file mode 100644 index 0000000..6f8b456 --- /dev/null +++ b/tests/test_admin.py @@ -0,0 +1,36 @@ +from django.test import Client +from django.urls import reverse +from django.utils.translation import gettext as _ + +import pytest +from pytest_django.asserts import assertContains + +from digid_eherkenning.models import DigidConfiguration, EherkenningConfiguration + + +@pytest.mark.django_db +def test_digid_configuration_admin_certificates_link( + admin_client: Client, + digid_config: DigidConfiguration, +): + url = reverse("admin:digid_eherkenning_digidconfiguration_change", args=(1,)) + + response = admin_client.get(url) + + assert response.status_code == 200 + assertContains(response, _("certificates")) + assertContains(response, _("Manage ({count})").format(count=1)) + + +@pytest.mark.django_db +def test_eherkenning_configuration_admin_certificates_link( + admin_client: Client, + eherkenning_config: EherkenningConfiguration, +): + url = reverse("admin:digid_eherkenning_eherkenningconfiguration_change", args=(1,)) + + response = admin_client.get(url) + + assert response.status_code == 200 + assertContains(response, _("certificates")) + assertContains(response, _("Manage ({count})").format(count=1)) diff --git a/tests/test_certificate_admin.py b/tests/test_certificate_admin.py new file mode 100644 index 0000000..c9c9aea --- /dev/null +++ b/tests/test_certificate_admin.py @@ -0,0 +1,22 @@ +from django.test import Client +from django.urls import reverse + +from pytest_django.asserts import assertContains + +from digid_eherkenning.choices import ConfigTypes +from digid_eherkenning.models import ConfigCertificate + + +def test_admin_changelist(admin_client: Client, digid_certificate): + url = reverse("admin:digid_eherkenning_configcertificate_changelist") + digid_certificate.label = "DigiD certificate" + digid_certificate.save() + ConfigCertificate.objects.create( + config_type=ConfigTypes.digid, + certificate=digid_certificate, + ) + + response = admin_client.get(url) + + assert response.status_code == 200 + assertContains(response, "DigiD certificate") diff --git a/tests/test_certificate_models.py b/tests/test_certificate_models.py new file mode 100644 index 0000000..3bb226d --- /dev/null +++ b/tests/test_certificate_models.py @@ -0,0 +1,296 @@ +from datetime import datetime, timedelta +from io import BytesIO + +from django.core.files import File +from django.utils import timezone + +import pytest +from cryptography import x509 +from freezegun import freeze_time +from simple_certmanager.constants import CertificateTypes +from simple_certmanager.models import Certificate +from simple_certmanager.test.certificate_generation import ( + cert_to_pem, + gen_key, + key_to_pem, + mkcert, +) + +from digid_eherkenning.choices import ConfigTypes +from digid_eherkenning.exceptions import CertificateProblem +from digid_eherkenning.models import ConfigCertificate +from digid_eherkenning.models.digid import DigidConfiguration +from digid_eherkenning.models.eherkenning import EherkenningConfiguration + +pytestmark = [pytest.mark.django_db] + + +def test_valid_certificate(temp_private_root, digid_certificate): + # note that this test will start failing once the certificates expire IRL (!) + config_certificate = ConfigCertificate( + config_type=ConfigTypes.digid, + certificate=digid_certificate, + ) + + assert config_certificate.is_ready_for_authn_requests + + +@freeze_time("2099-01-01") +def test_expired_certificate(temp_private_root, digid_certificate): + config_certificate = ConfigCertificate( + config_type=ConfigTypes.digid, + certificate=digid_certificate, + ) + + assert not config_certificate.is_ready_for_authn_requests + + +@freeze_time("1700-01-01") +def test_certificate_not_valid_yet(temp_private_root, digid_certificate): + config_certificate = ConfigCertificate( + config_type=ConfigTypes.digid, + certificate=digid_certificate, + ) + + assert not config_certificate.is_ready_for_authn_requests + + +def test_certificate_file_missing(): + # can happen if the infrastructure has an oopsie... + certificate = Certificate.objects.create( + type=CertificateTypes.key_pair, + public_certificate="bad/filepath/cert.pem", + ) + assert certificate.public_certificate.path + config_certificate = ConfigCertificate( + config_type=ConfigTypes.digid, + certificate=certificate, + ) + + assert not config_certificate.is_ready_for_authn_requests + + +def test_instance_without_certificate_provided(): + config_certificate = ConfigCertificate(config_type=ConfigTypes.digid) + + assert not config_certificate.is_ready_for_authn_requests + + +def test_certificate_wrong_type(temp_private_root, digid_certificate): + digid_certificate.type = CertificateTypes.cert_only + digid_certificate.save() + config_certificate = ConfigCertificate( + config_type=ConfigTypes.digid, + certificate=digid_certificate, + ) + + assert not config_certificate.is_ready_for_authn_requests + + +@pytest.mark.parametrize("path", ("", "missing/dir/bad-key-path.pem")) +def test_private_key_missing(temp_private_root, digid_certificate, path): + digid_certificate.private_key = path + digid_certificate.save() + config_certificate = ConfigCertificate( + config_type=ConfigTypes.digid, + certificate=digid_certificate, + ) + + assert not config_certificate.is_ready_for_authn_requests + + +def test_string_representation(settings, digid_certificate): + settings.LANGUAGE_CODE = "en" + digid_certificate.label = "SAML" + cc1 = ConfigCertificate( + config_type=ConfigTypes.digid, certificate=digid_certificate + ) + assert str(cc1) == "DigiD: SAML" + + cc2 = ConfigCertificate(config_type=ConfigTypes.eherkenning, certificate=None) + assert str(cc2) == "eHerkenning: (no certificate selected)" + + +# Helpers for multiple certificates - can't call fixtures multiple times to get +# different outcomes. + + +def _generate_config_certificate( + request: pytest.FixtureRequest, + config_type: ConfigTypes, + valid_from: datetime, +) -> Certificate: + request.getfixturevalue("temp_private_root") + subject = x509.Name( + [ + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "NL"), + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "Some-State"), + x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "OpenGem"), + x509.NameAttribute(x509.NameOID.COMMON_NAME, "widgits.example.org"), + ] + ) + root_cert = request.getfixturevalue("root_cert") + root_key = request.getfixturevalue("root_key") + + key = gen_key() + # hack to work around the hardcoded timezone.now() usage in the mkcert helper + assert valid_from.tzinfo is not None, "Thou shall not use naive datetimes" + with freeze_time(valid_from): + cert = mkcert( + subject=subject, + subject_key=key, + issuer=root_cert, + issuer_key=root_key, + can_issue=False, + ) + + cert_pem, key_pem = cert_to_pem(cert), key_to_pem(key) + certificate = Certificate.objects.create( + label=f"Certificate, {valid_from.isoformat()}", + type=CertificateTypes.key_pair, + public_certificate=File(BytesIO(cert_pem), name="public_certificate.pem"), + private_key=File(BytesIO(key_pem), name="private_key.pem"), + ) + ConfigCertificate.objects.create(config_type=config_type, certificate=certificate) + return certificate + + +@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration)) +def test_certificate_selection_picks_correct( + request: pytest.FixtureRequest, + model: type[DigidConfiguration] | type[EherkenningConfiguration], +): + config = model.get_solo() + config_type = config._as_config_type() + # expired + _generate_config_certificate( + request, config_type, valid_from=timezone.now() - timedelta(days=5) + ) + # currently valid + _current_cert = _generate_config_certificate( + request, config_type, valid_from=timezone.now() + ) + # valid tomorrow + _next_cert = _generate_config_certificate( + request, + config_type, + valid_from=timezone.now() + timedelta(days=1), + ) + + current_cert, next_cert = config.select_certificates() + + assert current_cert == _current_cert + assert next_cert == _next_cert + + +@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration)) +def test_certificate_selection_picks_correct_2( + request: pytest.FixtureRequest, + model: type[DigidConfiguration] | type[EherkenningConfiguration], +): + config = model.get_solo() + config_type = config._as_config_type() + # expired + _generate_config_certificate( + request, config_type, valid_from=timezone.now() - timedelta(days=5) + ) + # currently valid + _old_current = _generate_config_certificate( + request, config_type, valid_from=timezone.now() + ) + # valid tomorrow + _new_current = _generate_config_certificate( + request, + config_type, + valid_from=timezone.now() + timedelta(days=1), + ) + + # "current" has now expired + with freeze_time(timezone.now() + timedelta(days=1, hours=1)): + assert timezone.now() > _old_current.expiry_date + current_cert, next_cert = config.select_certificates() + + assert current_cert == _new_current + assert next_cert is None + + +@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration)) +def test_no_current_certificate( + request: pytest.FixtureRequest, + model: type[DigidConfiguration] | type[EherkenningConfiguration], +): + config = model.get_solo() + config_type = config._as_config_type() + # expired + _generate_config_certificate( + request, config_type, valid_from=timezone.now() - timedelta(days=5) + ) + + with pytest.raises(CertificateProblem): + config.select_certificates() + + +@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration)) +def test_skips_invalid_certificates_for_current( + request: pytest.FixtureRequest, + model: type[DigidConfiguration] | type[EherkenningConfiguration], +): + config = model.get_solo() + config_type = config._as_config_type() + now = timezone.now() + + # all are currently valid - but we'll introduce problems + (c1, c2, c3, c4, c5) = [ + _generate_config_certificate(request, config_type, valid_from=now) + for _ in range(0, 5) + ] + # must be keypair + c1.type = CertificateTypes.cert_only + c1.save() + c2.private_key = "" + c2.save() + c3.public_certificate = "" + c3.save() + c4.public_certificate.storage.delete(c4.public_certificate.name) + c5.private_key.storage.delete(c5.public_certificate.name) + # this one is okay + _current = _generate_config_certificate(request, config_type, valid_from=now) + + current_cert, next_cert = config.select_certificates() + + assert current_cert == _current + assert next_cert is None + + +@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration)) +def test_skips_invalid_certificates_for_next( + request: pytest.FixtureRequest, + model: type[DigidConfiguration] | type[EherkenningConfiguration], +): + config = model.get_solo() + config_type = config._as_config_type() + now = timezone.now() + + # all are currently valid - but we'll introduce problems + (c1, c2, c3, c4, c5) = [ + _generate_config_certificate( + request, config_type, valid_from=now + timedelta(days=1) + ) + for _ in range(0, 5) + ] + # must be keypair + c1.type = CertificateTypes.cert_only + c1.save() + c2.private_key = "" + c2.save() + c3.public_certificate = "" + c3.save() + c4.public_certificate.storage.delete(c4.public_certificate.name) + c5.private_key.storage.delete(c5.public_certificate.name) + # this one is okay + _current = _generate_config_certificate(request, config_type, valid_from=now) + + current_cert, next_cert = config.select_certificates() + + assert current_cert == _current + assert next_cert is None diff --git a/tests/test_dienst_catalogus_creation.py b/tests/test_dienst_catalogus_creation.py index 054b6c7..7e1e7b7 100644 --- a/tests/test_dienst_catalogus_creation.py +++ b/tests/test_dienst_catalogus_creation.py @@ -1,6 +1,3 @@ -from io import StringIO - -from django.core.management import CommandError, call_command from django.test import TestCase import pytest @@ -13,7 +10,6 @@ generate_dienst_catalogus_metadata, ) -from .conftest import EHERKENNING_TEST_CERTIFICATE_FILE, EHERKENNING_TEST_KEY_FILE from .mixins import EherkenningMetadataMixin NAMESPACES = { @@ -274,7 +270,7 @@ def test_catalogus_with_requested_attributes_without_purpose_statement( @pytest.mark.django_db def test_makelaar_oin_is_configurable(eherkenning_config_defaults, temp_private_root): config = EherkenningConfiguration.get_solo() - config.organisation_name = "Example" + config.organization_name = "Example" config.service_name = "Example" config.oin = "00000000000000000000" config.makelaar_id = "00000000000000000123" @@ -679,331 +675,3 @@ def test_no_eidas_service(self): namespaces=NAMESPACES, ) self.assertEqual(0, len(classifier_node)) - - -@pytest.mark.django_db -def test_generate_metadata_all_options_specified(temp_private_root): - stdout = StringIO() - - call_command( - "generate_eherkenning_dienstcatalogus", - "--no-save-config", - stdout=stdout, - key_file=str(EHERKENNING_TEST_KEY_FILE), - cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE), - signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256", - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - organization_name="Test Organisation", - eh_attribute_consuming_service_index="9050", - eidas_attribute_consuming_service_index="9051", - oin="00000001112223330000", - service_name="Test Service Name", - service_description="Test Service Description", - makelaar_id="00000003332221110000", - privacy_policy="http://test-privacy.nl", - test=True, - ) - - output = stdout.getvalue() - service_catalogue_node = etree.XML(output.encode("utf-8")) - - signature_algorithm_node = service_catalogue_node.find( - ".//ds:SignatureMethod", - namespaces=NAMESPACES, - ) - assert ( - signature_algorithm_node.attrib["Algorithm"] - == "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256" - ) - - digest_algorithm_node = service_catalogue_node.find( - ".//ds:DigestMethod", - namespaces=NAMESPACES, - ) - assert ( - digest_algorithm_node.attrib["Algorithm"] - == "http://www.w3.org/2001/04/xmlenc#sha256" - ) - - # Service Provider - service_provider_id_node = service_catalogue_node.find( - ".//esc:ServiceProviderID", - namespaces=NAMESPACES, - ) - assert service_provider_id_node.text == "00000001112223330000" - - oganisation_display_node = service_catalogue_node.find( - ".//esc:OrganizationDisplayName", - namespaces=NAMESPACES, - ) - assert oganisation_display_node.text == "Test Organisation" - - # Services - service_definition_nodes = service_catalogue_node.findall( - ".//esc:ServiceDefinition", - namespaces=NAMESPACES, - ) - assert len(service_definition_nodes) == 2 - - eherkenning_definition_node, eidas_definition_node = service_definition_nodes - - # eHerkenning service definition - uuid_node = eherkenning_definition_node.find( - ".//esc:ServiceUUID", - namespaces=NAMESPACES, - ) - assert uuid_node is not None - - service_name_node = eherkenning_definition_node.find( - ".//esc:ServiceName", - namespaces=NAMESPACES, - ) - assert service_name_node.text == "Test Service Name" - - service_description_node = eherkenning_definition_node.find( - ".//esc:ServiceDescription", - namespaces=NAMESPACES, - ) - assert service_description_node.text == "Test Service Description" - - loa_node = eherkenning_definition_node.find( - ".//saml:AuthnContextClassRef", - namespaces=NAMESPACES, - ) - assert loa_node.text == "urn:etoegang:core:assurance-class:loa3" - - makelaar_id_node = eherkenning_definition_node.find( - ".//esc:HerkenningsmakelaarId", - namespaces=NAMESPACES, - ) - assert makelaar_id_node.text == "00000003332221110000" - - entity_concerned_nodes = eherkenning_definition_node.findall( - ".//esc:EntityConcernedTypesAllowed", - namespaces=NAMESPACES, - ) - assert len(entity_concerned_nodes) == 3 - assert entity_concerned_nodes[0].attrib["setNumber"] == "1" - assert entity_concerned_nodes[0].text == "urn:etoegang:1.9:EntityConcernedID:RSIN" - assert entity_concerned_nodes[1].attrib["setNumber"] == "1" - assert entity_concerned_nodes[1].text == "urn:etoegang:1.9:EntityConcernedID:KvKnr" - assert entity_concerned_nodes[2].attrib["setNumber"] == "2" - assert entity_concerned_nodes[2].text == "urn:etoegang:1.9:EntityConcernedID:KvKnr" - - # eIDAS service definition - uuid_node = eidas_definition_node.find( - ".//esc:ServiceUUID", - namespaces=NAMESPACES, - ) - assert uuid_node is not None - - service_name_node = eidas_definition_node.find( - ".//esc:ServiceName", - namespaces=NAMESPACES, - ) - assert service_name_node.text == "Test Service Name (eIDAS)" - - service_description_node = eidas_definition_node.find( - ".//esc:ServiceDescription", - namespaces=NAMESPACES, - ) - assert service_description_node.text == "Test Service Description" - - loa_node = eidas_definition_node.find( - ".//saml:AuthnContextClassRef", - namespaces=NAMESPACES, - ) - assert loa_node.text == "urn:etoegang:core:assurance-class:loa3" - - makelaar_id_node = eidas_definition_node.find( - ".//esc:HerkenningsmakelaarId", - namespaces=NAMESPACES, - ) - assert makelaar_id_node.text == "00000003332221110000" - - entity_concerned_nodes = eidas_definition_node.findall( - ".//esc:EntityConcernedTypesAllowed", - namespaces=NAMESPACES, - ) - assert len(entity_concerned_nodes) == 1 - assert entity_concerned_nodes[0].text == "urn:etoegang:1.9:EntityConcernedID:Pseudo" - - # Service instances - service_instance_nodes = service_catalogue_node.findall( - ".//esc:ServiceInstance", - namespaces=NAMESPACES, - ) - assert len(service_instance_nodes) == 2 - - eherkenning_instance_node, eidas_instance_node = service_instance_nodes - - # Service instance eHerkenning - service_id_node = eherkenning_instance_node.find( - ".//esc:ServiceID", - namespaces=NAMESPACES, - ) - assert service_id_node.text == "urn:etoegang:DV:00000001112223330000:services:9050" - - service_url_node = eherkenning_instance_node.find( - ".//esc:ServiceURL", - namespaces=NAMESPACES, - ) - assert service_url_node.text == "http://test-entity.id" - - privacy_url_node = eherkenning_instance_node.find( - ".//esc:PrivacyPolicyURL", - namespaces=NAMESPACES, - ) - assert privacy_url_node.text == "http://test-privacy.nl" - - makelaar_id_node = eherkenning_instance_node.find( - ".//esc:HerkenningsmakelaarId", - namespaces=NAMESPACES, - ) - assert makelaar_id_node.text == "00000003332221110000" - - key_name_node = eherkenning_instance_node.find( - ".//ds:KeyName", - namespaces=NAMESPACES, - ) - assert key_name_node is not None - certificate_node = eherkenning_instance_node.find( - ".//ds:X509Certificate", - namespaces=NAMESPACES, - ) - assert certificate_node is not None - - classifier_node = eherkenning_instance_node.findall( - ".//esc:Classifier", - namespaces=NAMESPACES, - ) - assert len(classifier_node) == 0 - - # Service instance eIDAS - service_id_node = eidas_instance_node.find( - ".//esc:ServiceID", - namespaces=NAMESPACES, - ) - assert service_id_node.text == "urn:etoegang:DV:00000001112223330000:services:9051" - - service_url_node = eidas_instance_node.find( - ".//esc:ServiceURL", - namespaces=NAMESPACES, - ) - assert service_url_node.text == "http://test-entity.id" - - privacy_url_node = eidas_instance_node.find( - ".//esc:PrivacyPolicyURL", - namespaces=NAMESPACES, - ) - assert privacy_url_node.text == "http://test-privacy.nl" - - makelaar_id_node = eidas_instance_node.find( - ".//esc:HerkenningsmakelaarId", - namespaces=NAMESPACES, - ) - assert makelaar_id_node.text == "00000003332221110000" - - key_name_node = eidas_instance_node.find( - ".//ds:KeyName", - namespaces=NAMESPACES, - ) - assert key_name_node is not None - certificate_node = eidas_instance_node.find( - ".//ds:X509Certificate", - namespaces=NAMESPACES, - ) - assert certificate_node is not None - - classifier_node = eidas_instance_node.findall( - ".//esc:Classifier", - namespaces=NAMESPACES, - ) - assert len(classifier_node) == 1 - assert classifier_node[0].text == "eIDAS-inbound" - - -@pytest.mark.django_db -def test_missing_required_properties(): - with pytest.raises(CommandError): - call_command("generate_eherkenning_dienstcatalogus") - - -@pytest.mark.django_db -def test_no_eidas_service(temp_private_root): - stdout = StringIO() - - call_command( - "generate_eherkenning_dienstcatalogus", - "--no-save-config", - stdout=stdout, - key_file=str(EHERKENNING_TEST_KEY_FILE), - cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE), - signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256", - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - organization_name="Test Organisation", - eh_attribute_consuming_service_index="9050", - no_eidas=True, - oin="00000001112223330000", - service_name="Test Service Name", - service_description="Test Service Description", - makelaar_id="00000003332221110000", - privacy_policy="http://test-privacy.nl", - test=True, - ) - - output = stdout.getvalue() - service_catalogue_node = etree.XML(output.encode("utf-8")) - - service_instance_nodes = service_catalogue_node.findall( - ".//esc:ServiceInstance", - namespaces=NAMESPACES, - ) - assert len(service_instance_nodes) == 1 - - eherkenning_instance_node = service_instance_nodes[0] - # Service instance eHerkenning - service_id_node = eherkenning_instance_node.find( - ".//esc:ServiceID", - namespaces=NAMESPACES, - ) - assert service_id_node.text == "urn:etoegang:DV:00000001112223330000:services:9050" - - service_url_node = eherkenning_instance_node.find( - ".//esc:ServiceURL", - namespaces=NAMESPACES, - ) - assert service_url_node.text == "http://test-entity.id" - - privacy_url_node = eherkenning_instance_node.find( - ".//esc:PrivacyPolicyURL", - namespaces=NAMESPACES, - ) - assert privacy_url_node.text == "http://test-privacy.nl" - - makelaar_id_node = eherkenning_instance_node.find( - ".//esc:HerkenningsmakelaarId", - namespaces=NAMESPACES, - ) - assert makelaar_id_node.text == "00000003332221110000" - - key_name_node = eherkenning_instance_node.find( - ".//ds:KeyName", - namespaces=NAMESPACES, - ) - assert key_name_node is not None - certificate_node = eherkenning_instance_node.find( - ".//ds:X509Certificate", - namespaces=NAMESPACES, - ) - assert certificate_node is not None - - classifier_node = eherkenning_instance_node.findall( - ".//esc:Classifier", - namespaces=NAMESPACES, - ) - assert len(classifier_node) == 0 diff --git a/tests/test_digid_metadata.py b/tests/test_digid_metadata.py index 3e52258..569537a 100644 --- a/tests/test_digid_metadata.py +++ b/tests/test_digid_metadata.py @@ -1,16 +1,13 @@ -from io import StringIO - -from django.core.management import CommandError, call_command from django.test import TestCase import pytest from lxml import etree -from privates.test import temp_private_root +from simple_certmanager.models import Certificate -from digid_eherkenning.models import DigidConfiguration +from digid_eherkenning.choices import ConfigTypes +from digid_eherkenning.models import ConfigCertificate, DigidConfiguration from digid_eherkenning.saml2.digid import generate_digid_metadata -from .conftest import DIGID_TEST_CERTIFICATE_FILE, DIGID_TEST_KEY_FILE from .mixins import DigidMetadataMixin NAME_SPACES = { @@ -19,329 +16,6 @@ } -@temp_private_root() -class DigidMetadataManagementCommandTests(TestCase): - def test_generate_metadata_all_options_specified(self): - stdout = StringIO() - - call_command( - "generate_digid_metadata", - "--no-save-config", - "--slo", - stdout=stdout, - want_assertions_encrypted=True, - want_assertions_signed=True, - key_file=str(DIGID_TEST_KEY_FILE), - cert_file=str(DIGID_TEST_CERTIFICATE_FILE), - signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256", - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - attribute_consuming_service_index="9050", - service_name="Test Service Name", - service_description="Test Service Description", - technical_contact_person_telephone="06123123123", - technical_contact_person_email="test@test.nl", - organization_name="Test organisation", - organization_url="http://test-organisation.nl", - test=True, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - self.assertEqual( - "http://test-entity.id", entity_descriptor_node.attrib["entityID"] - ) - - sspo_descriptor_node = entity_descriptor_node.find( - ".//md:SPSSODescriptor", - namespaces=NAME_SPACES, - ) - - self.assertEqual("true", sspo_descriptor_node.attrib["AuthnRequestsSigned"]) - self.assertEqual("true", sspo_descriptor_node.attrib["WantAssertionsSigned"]) - - certificate_node = entity_descriptor_node.find( - ".//ds:X509Certificate", - namespaces=NAME_SPACES, - ) - self.assertIn( - "MIIC0DCCAbigAwIBAgIUEjGmfCGa1cOiTi+UKtDQVtySOHUwDQYJKoZIhvcNAQEL", - certificate_node.text, - ) - - signature_algorithm_node = entity_descriptor_node.find( - ".//ds:SignatureMethod", - namespaces=NAME_SPACES, - ) - self.assertEqual( - "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - signature_algorithm_node.attrib["Algorithm"], - ) - - digest_algorithm_node = entity_descriptor_node.find( - ".//ds:DigestMethod", - namespaces=NAME_SPACES, - ) - self.assertEqual( - "http://www.w3.org/2001/04/xmlenc#sha256", - digest_algorithm_node.attrib["Algorithm"], - ) - - assertion_consuming_service_node = entity_descriptor_node.find( - ".//md:AssertionConsumerService", - namespaces=NAME_SPACES, - ) - self.assertEqual( - "http://test-entity.id/digid/acs/", - assertion_consuming_service_node.attrib["Location"], - ) - - attribute_consuming_service_node = entity_descriptor_node.find( - ".//md:AttributeConsumingService", - namespaces=NAME_SPACES, - ) - self.assertEqual("9050", attribute_consuming_service_node.attrib["index"]) - - service_name_node = entity_descriptor_node.find( - ".//md:ServiceName", - namespaces=NAME_SPACES, - ) - self.assertEqual("Test Service Name", service_name_node.text) - - service_description_node = entity_descriptor_node.find( - ".//md:ServiceDescription", - namespaces=NAME_SPACES, - ) - self.assertEqual("Test Service Description", service_description_node.text) - - organisation_name_node = entity_descriptor_node.find( - ".//md:OrganizationName", - namespaces=NAME_SPACES, - ) - self.assertEqual("Test organisation", organisation_name_node.text) - - organisation_display_node = entity_descriptor_node.find( - ".//md:OrganizationDisplayName", - namespaces=NAME_SPACES, - ) - self.assertEqual("Test organisation", organisation_display_node.text) - - organisation_url_node = entity_descriptor_node.find( - ".//md:OrganizationURL", - namespaces=NAME_SPACES, - ) - self.assertEqual("http://test-organisation.nl", organisation_url_node.text) - - contact_person_node = entity_descriptor_node.find( - ".//md:ContactPerson", - namespaces=NAME_SPACES, - ) - self.assertEqual("technical", contact_person_node.attrib["contactType"]) - - contact_email_node = entity_descriptor_node.find( - ".//md:EmailAddress", - namespaces=NAME_SPACES, - ) - self.assertEqual("test@test.nl", contact_email_node.text) - - contact_telephone_node = entity_descriptor_node.find( - ".//md:TelephoneNumber", - namespaces=NAME_SPACES, - ) - self.assertEqual("06123123123", contact_telephone_node.text) - - slo_nodes = entity_descriptor_node.findall( - ".//md:SingleLogoutService", - namespaces=NAME_SPACES, - ) - self.assertEqual(len(slo_nodes), 2) - slo_soap, slo_redirect = slo_nodes - self.assertEqual( - slo_soap.attrib["Location"], "http://test-entity.id/digid/slo/soap/" - ) - self.assertEqual( - slo_soap.attrib["Binding"], "urn:oasis:names:tc:SAML:2.0:bindings:SOAP" - ) - self.assertEqual( - slo_redirect.attrib["Location"], "http://test-entity.id/digid/slo/redirect/" - ) - self.assertEqual( - slo_redirect.attrib["Binding"], - "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", - ) - - def test_missing_required_properties(self): - expected_error = ( - "Error: the following arguments are required: --key-file, --cert-file, " - "--entity-id, --base-url, --service-name, --service-description, " - "--save-config/--no-save-config" - ) - - with self.assertRaisesMessage(CommandError, expected_error): - call_command("generate_digid_metadata") - - def test_contact_telephone_no_email(self): - stdout = StringIO() - - call_command( - "generate_digid_metadata", - "--no-save-config", - "--slo", - want_assertions_encrypted=True, - want_assertions_signed=True, - key_file=str(DIGID_TEST_KEY_FILE), - cert_file=str(DIGID_TEST_CERTIFICATE_FILE), - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - service_name="Test Service Name", - service_description="Test Service Description", - technical_contact_person_telephone="06123123123", - test=True, - stdout=stdout, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - contact_email_node = entity_descriptor_node.find( - ".//md:EmailAddress", - namespaces=NAME_SPACES, - ) - contact_telephone_node = entity_descriptor_node.find( - ".//md:TelephoneNumber", - namespaces=NAME_SPACES, - ) - - self.assertIsNone(contact_email_node) - self.assertIsNone(contact_telephone_node) - - def test_organisation_url_no_service(self): - stdout = StringIO() - - call_command( - "generate_digid_metadata", - "--no-save-config", - "--slo", - stdout=stdout, - want_assertions_encrypted=True, - want_assertions_signed=True, - key_file=str(DIGID_TEST_KEY_FILE), - cert_file=str(DIGID_TEST_CERTIFICATE_FILE), - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - service_name="Test Service Name", - service_description="Test Service Description", - organization_url="http://test-organisation.nl", - test=True, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - organisation_name_node = entity_descriptor_node.find( - ".//md:OrganizationName", - namespaces=NAME_SPACES, - ) - organisation_display_node = entity_descriptor_node.find( - ".//md:OrganizationDisplayName", - namespaces=NAME_SPACES, - ) - organisation_url_node = entity_descriptor_node.find( - ".//md:OrganizationURL", - namespaces=NAME_SPACES, - ) - - self.assertIsNone(organisation_name_node) - self.assertIsNone(organisation_display_node) - self.assertIsNone(organisation_url_node) - - def test_slo_not_supported(self): - stdout = StringIO() - - call_command( - "generate_digid_metadata", - "--no-save-config", - "--no-slo", - stdout=stdout, - want_assertions_encrypted=True, - want_assertions_signed=True, - key_file=str(DIGID_TEST_KEY_FILE), - cert_file=str(DIGID_TEST_CERTIFICATE_FILE), - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - service_name="Test Service Name", - service_description="Test Service Description", - test=True, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - single_logout_service_node = entity_descriptor_node.find( - ".//md:SingleLogoutService", - namespaces=NAME_SPACES, - ) - self.assertIsNone(single_logout_service_node) - - def test_management_command_and_update_config(self): - stdout = StringIO() - assert not DigidConfiguration.objects.exists() - - call_command( - "generate_digid_metadata", - "--save-config", - "--want-assertions-encrypted", - "--no-only-assertions-signed", - ["--attribute-consuming-service-index", "1"], - key_file=str(DIGID_TEST_KEY_FILE), - cert_file=str(DIGID_TEST_CERTIFICATE_FILE), - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - service_name="Test Service Name", - service_description="Test Service Description", - stdout=stdout, - test=True, - ) - - self.assertTrue(DigidConfiguration.objects.exists()) - config = DigidConfiguration.get_solo() - self.assertTrue(config.want_assertions_encrypted) - self.assertFalse(config.want_assertions_signed) - self.assertEqual(config.service_name, "Test Service Name") - self.assertEqual(config.service_description, "Test Service Description") - self.assertEqual(config.attribute_consuming_service_index, "1") - - self.assertIsNotNone(config.certificate) - - with config.certificate.private_key.open("rb") as privkey: - with DIGID_TEST_KEY_FILE.open("rb") as source_privkey: - self.assertEqual(privkey.read(), source_privkey.read()) - - with config.certificate.public_certificate.open("rb") as cert: - with DIGID_TEST_CERTIFICATE_FILE.open("rb") as source_cert: - self.assertEqual(cert.read(), source_cert.read()) - - -@pytest.mark.django_db -def test_properties_in_db_config_not_required(digid_config): - """ - Assert that required properties already configured don't cause problems. - """ - digid_config.service_description = "CLI test" - digid_config.save() - try: - call_command( - "generate_digid_metadata", - "--no-save-config", - "--test", - stdout=StringIO(), - ) - except CommandError: - pytest.fail("Database configuration is valid for management commands.") - - @pytest.mark.usefixtures("digid_config", "temp_private_root") class DigidMetadataGenerationTests(DigidMetadataMixin, TestCase): def test_generate_metadata_all_options_specified(self): @@ -540,3 +214,55 @@ def test_slo_not_supported(self): namespaces=NAME_SPACES, ) self.assertIsNone(single_logout_service_node) + + +@pytest.mark.django_db +def test_current_and_next_certificate_in_metadata( + temp_private_root, + digid_config: DigidConfiguration, + digid_certificate: Certificate, + next_certificate: Certificate, +): + ConfigCertificate.objects.create( + config_type=ConfigTypes.digid, + certificate=next_certificate, + ) + assert ConfigCertificate.objects.count() == 2 # expect current and next + + digid_metadata = generate_digid_metadata() + + entity_descriptor_node = etree.XML(digid_metadata) + + metadata_node = entity_descriptor_node.find( + "md:SPSSODescriptor", namespaces=NAME_SPACES + ) + assert metadata_node is not None + key_nodes = metadata_node.findall("md:KeyDescriptor", namespaces=NAME_SPACES) + assert len(key_nodes) == 2 # we expect current + next key + key1_node, key2_node = key_nodes + assert key1_node.attrib["use"] == "signing" + assert key2_node.attrib["use"] == "signing" + + with ( + digid_certificate.public_certificate.open("r") as _current, + next_certificate.public_certificate.open("r") as _next, + ): + current_base64 = _current.read().replace("\n", "") + next_base64 = _next.read().replace("\n", "") + + # certificate nodes include only the base64 encoded PEM data, without header/footer + cert1_node = key1_node.find( + "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES + ) + assert cert1_node is not None + assert cert1_node.text is not None + assert (cert_data_1 := cert1_node.text.strip()) in current_base64 + + cert2_node = key2_node.find( + "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES + ) + assert cert2_node is not None + assert cert2_node.text is not None + assert (cert_data_2 := cert2_node.text.strip()) in next_base64 + # they should not be the same + assert cert_data_1 != cert_data_2 diff --git a/tests/test_eherkenning_metadata.py b/tests/test_eherkenning_metadata.py index 00c0f7b..c2fe4d9 100644 --- a/tests/test_eherkenning_metadata.py +++ b/tests/test_eherkenning_metadata.py @@ -1,19 +1,16 @@ -from io import StringIO - -from django.core.management import CommandError, call_command from django.test import TestCase import pytest from lxml import etree -from privates.test import temp_private_root +from simple_certmanager.models import Certificate -from digid_eherkenning.models import EherkenningConfiguration +from digid_eherkenning.choices import ConfigTypes +from digid_eherkenning.models import ConfigCertificate, EherkenningConfiguration from digid_eherkenning.saml2.eherkenning import ( eHerkenningClient, generate_eherkenning_metadata, ) -from .conftest import EHERKENNING_TEST_CERTIFICATE_FILE, EHERKENNING_TEST_KEY_FILE from .mixins import EherkenningMetadataMixin NAME_SPACES = { @@ -22,345 +19,6 @@ } -@temp_private_root() -class EHerkenningMetadataManagementCommandTests(TestCase): - def test_generate_metadata_all_options_specified(self): - stdout = StringIO() - - call_command( - "generate_eherkenning_metadata", - "--no-save-config", - stdout=stdout, - want_assertions_encrypted=True, - want_assertions_signed=True, - key_file=str(EHERKENNING_TEST_KEY_FILE), - cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE), - signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256", - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - eh_attribute_consuming_service_index="9050", - eidas_attribute_consuming_service_index="9051", - oin="00000001112223330000", - service_name="Test Service Name", - service_description="Test Service Description", - technical_contact_person_telephone="06123123123", - technical_contact_person_email="test@test.nl", - organization_name="Test organisation", - organization_url="http://test-organisation.nl", - test=True, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - self.assertEqual( - "http://test-entity.id", entity_descriptor_node.attrib["entityID"] - ) - - sspo_descriptor_node = entity_descriptor_node.find( - ".//md:SPSSODescriptor", - namespaces=NAME_SPACES, - ) - - self.assertEqual("true", sspo_descriptor_node.attrib["AuthnRequestsSigned"]) - self.assertEqual("true", sspo_descriptor_node.attrib["WantAssertionsSigned"]) - - certificate_node = entity_descriptor_node.find( - ".//ds:X509Certificate", - namespaces=NAME_SPACES, - ) - self.assertIn( - "MIIC0DCCAbigAwIBAgIUEjGmfCGa1cOiTi+UKtDQVtySOHUwDQYJKoZIhvcNAQEL", - certificate_node.text, - ) - - signature_algorithm_node = entity_descriptor_node.find( - ".//ds:SignatureMethod", - namespaces=NAME_SPACES, - ) - self.assertEqual( - "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - signature_algorithm_node.attrib["Algorithm"], - ) - - digest_algorithm_node = entity_descriptor_node.find( - ".//ds:DigestMethod", - namespaces=NAME_SPACES, - ) - self.assertEqual( - "http://www.w3.org/2001/04/xmlenc#sha256", - digest_algorithm_node.attrib["Algorithm"], - ) - - assertion_consuming_service_node = entity_descriptor_node.find( - ".//md:AssertionConsumerService", - namespaces=NAME_SPACES, - ) - self.assertEqual( - "http://test-entity.id/eherkenning/acs/", - assertion_consuming_service_node.attrib["Location"], - ) - - attribute_consuming_service_nodes = entity_descriptor_node.findall( - ".//md:AttributeConsumingService", - namespaces=NAME_SPACES, - ) - self.assertEqual(2, len(attribute_consuming_service_nodes)) - - eh_attribute_consuming_service_node = attribute_consuming_service_nodes[0] - eidas_attribute_consuming_service_node = attribute_consuming_service_nodes[1] - - self.assertEqual( - "urn:etoegang:DV:00000001112223330000:services:9050", - eh_attribute_consuming_service_node.find( - ".//md:RequestedAttribute", namespaces=NAME_SPACES - ).attrib["Name"], - ) - self.assertEqual( - "Test Service Name", - eh_attribute_consuming_service_node.find( - ".//md:ServiceName", namespaces=NAME_SPACES - ).text, - ) - self.assertEqual( - "Test Service Description", - eh_attribute_consuming_service_node.find( - ".//md:ServiceDescription", namespaces=NAME_SPACES - ).text, - ) - self.assertEqual( - "urn:etoegang:DV:00000001112223330000:services:9051", - eidas_attribute_consuming_service_node.find( - ".//md:RequestedAttribute", namespaces=NAME_SPACES - ).attrib["Name"], - ) - self.assertEqual( - "Test Service Name (eIDAS)", - eidas_attribute_consuming_service_node.find( - ".//md:ServiceName", namespaces=NAME_SPACES - ).text, - ) - self.assertEqual( - "Test Service Description", - eidas_attribute_consuming_service_node.find( - ".//md:ServiceDescription", namespaces=NAME_SPACES - ).text, - ) - - organisation_name_node = entity_descriptor_node.find( - ".//md:OrganizationName", - namespaces=NAME_SPACES, - ) - self.assertEqual("Test organisation", organisation_name_node.text) - - organisation_display_node = entity_descriptor_node.find( - ".//md:OrganizationDisplayName", - namespaces=NAME_SPACES, - ) - self.assertEqual("Test organisation", organisation_display_node.text) - - organisation_url_node = entity_descriptor_node.find( - ".//md:OrganizationURL", - namespaces=NAME_SPACES, - ) - self.assertEqual("http://test-organisation.nl", organisation_url_node.text) - - contact_person_node = entity_descriptor_node.find( - ".//md:ContactPerson", - namespaces=NAME_SPACES, - ) - self.assertEqual("technical", contact_person_node.attrib["contactType"]) - - contact_email_node = entity_descriptor_node.find( - ".//md:EmailAddress", - namespaces=NAME_SPACES, - ) - self.assertEqual("test@test.nl", contact_email_node.text) - - contact_telephone_node = entity_descriptor_node.find( - ".//md:TelephoneNumber", - namespaces=NAME_SPACES, - ) - self.assertEqual("06123123123", contact_telephone_node.text) - - def test_missing_required_properties(self): - with self.assertRaises(CommandError): - call_command("generate_eherkenning_metadata") - - def test_contact_telephone_no_email(self): - stdout = StringIO() - - call_command( - "generate_eherkenning_metadata", - "--no-save-config", - stdout=stdout, - want_assertions_encrypted=True, - want_assertions_signed=True, - key_file=str(EHERKENNING_TEST_KEY_FILE), - cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE), - oin="00000001112223330000", - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - service_name="Test Service Name", - service_description="Test Service Description", - technical_contact_person_telephone="06123123123", - test=True, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - contact_email_node = entity_descriptor_node.find( - ".//md:EmailAddress", - namespaces=NAME_SPACES, - ) - contact_telephone_node = entity_descriptor_node.find( - ".//md:TelephoneNumber", - namespaces=NAME_SPACES, - ) - - self.assertIsNone(contact_email_node) - self.assertIsNone(contact_telephone_node) - - def test_organisation_url_no_service(self): - stdout = StringIO() - - call_command( - "generate_eherkenning_metadata", - "--no-save-config", - stdout=stdout, - want_assertions_encrypted=True, - want_assertions_signed=True, - oin="00000001112223330000", - key_file=str(EHERKENNING_TEST_KEY_FILE), - cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE), - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - service_name="Test Service Name", - service_description="Test Service Description", - organization_url="http://test-organisation.nl", - test=True, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - organisation_name_node = entity_descriptor_node.find( - ".//md:OrganizationName", - namespaces=NAME_SPACES, - ) - organisation_display_node = entity_descriptor_node.find( - ".//md:OrganizationDisplayName", - namespaces=NAME_SPACES, - ) - organisation_url_node = entity_descriptor_node.find( - ".//md:OrganizationURL", - namespaces=NAME_SPACES, - ) - - self.assertIsNone(organisation_name_node) - self.assertIsNone(organisation_display_node) - self.assertIsNone(organisation_url_node) - - def test_no_eidas_service(self): - stdout = StringIO() - - call_command( - "generate_eherkenning_metadata", - "--no-save-config", - stdout=stdout, - want_assertions_encrypted=True, - want_assertions_signed=True, - key_file=str(EHERKENNING_TEST_KEY_FILE), - cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE), - signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256", - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - eh_attribute_consuming_service_index="9050", - oin="00000001112223330000", - no_eidas=True, - service_name="Test Service Name", - service_description="Test Service Description", - technical_contact_person_telephone="06123123123", - technical_contact_person_email="test@test.nl", - organization_name="Test organisation", - organization_url="http://test-organisation.nl", - test=True, - ) - - output = stdout.getvalue() - entity_descriptor_node = etree.XML(output.encode("utf-8")) - - attribute_consuming_service_nodes = entity_descriptor_node.findall( - ".//md:AttributeConsumingService", - namespaces=NAME_SPACES, - ) - self.assertEqual(1, len(attribute_consuming_service_nodes)) - - eh_attribute_consuming_service_node = attribute_consuming_service_nodes[0] - - self.assertEqual( - "urn:etoegang:DV:00000001112223330000:services:9050", - eh_attribute_consuming_service_node.find( - ".//md:RequestedAttribute", namespaces=NAME_SPACES - ).attrib["Name"], - ) - self.assertEqual( - "Test Service Name", - eh_attribute_consuming_service_node.find( - ".//md:ServiceName", namespaces=NAME_SPACES - ).text, - ) - self.assertEqual( - "Test Service Description", - eh_attribute_consuming_service_node.find( - ".//md:ServiceDescription", namespaces=NAME_SPACES - ).text, - ) - - def test_management_command_and_update_config(self): - stdout = StringIO() - assert not EherkenningConfiguration.objects.exists() - - call_command( - "generate_eherkenning_metadata", - "--save-config", - "--want-assertions-encrypted", - "--no-only-assertions-signed", - ["--eh-attribute-consuming-service-index", "1"], - key_file=str(EHERKENNING_TEST_KEY_FILE), - cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE), - entity_id="http://test-entity.id", - base_url="http://test-entity.id", - service_name="Test Service Name", - service_description="Test Service Description", - oin="01234567890123456789", - stdout=stdout, - test=True, - ) - - self.assertTrue(EherkenningConfiguration.objects.exists()) - config = EherkenningConfiguration.get_solo() - self.assertTrue(config.want_assertions_encrypted) - self.assertFalse(config.want_assertions_signed) - self.assertEqual(config.oin, "01234567890123456789") - self.assertEqual(config.service_name, "Test Service Name") - self.assertEqual(config.service_description, "Test Service Description") - self.assertEqual(config.eh_attribute_consuming_service_index, "1") - - self.assertIsNotNone(config.certificate) - - with config.certificate.private_key.open("rb") as privkey: - with EHERKENNING_TEST_KEY_FILE.open("rb") as source_privkey: - self.assertEqual(privkey.read(), source_privkey.read()) - - with config.certificate.public_certificate.open("rb") as cert: - with EHERKENNING_TEST_CERTIFICATE_FILE.open("rb") as source_cert: - self.assertEqual(cert.read(), source_cert.read()) - - @pytest.mark.usefixtures("eherkenning_config", "temp_private_root") class EHerkenningClientTests(TestCase): def test_attribute_consuming_services_with_non_required_requested_attribute(self): @@ -714,3 +372,55 @@ def test_no_eidas_service(self): ".//md:ServiceDescription", namespaces=NAME_SPACES ).text, ) + + +@pytest.mark.django_db +def test_current_and_next_certificate_in_metadata( + temp_private_root, + eherkenning_config: EherkenningConfiguration, + eherkenning_certificate: Certificate, + next_certificate: Certificate, +): + ConfigCertificate.objects.create( + config_type=ConfigTypes.eherkenning, + certificate=next_certificate, + ) + assert ConfigCertificate.objects.count() == 2 # expect current and next + + eh_metadata = generate_eherkenning_metadata() + + entity_descriptor_node = etree.XML(eh_metadata) + + metadata_node = entity_descriptor_node.find( + "md:SPSSODescriptor", namespaces=NAME_SPACES + ) + assert metadata_node is not None + key_nodes = metadata_node.findall("md:KeyDescriptor", namespaces=NAME_SPACES) + assert len(key_nodes) == 2 # we expect current + next key + key1_node, key2_node = key_nodes + assert key1_node.attrib["use"] == "signing" + assert key2_node.attrib["use"] == "signing" + + with ( + eherkenning_certificate.public_certificate.open("r") as _current, + next_certificate.public_certificate.open("r") as _next, + ): + current_base64 = _current.read().replace("\n", "") + next_base64 = _next.read().replace("\n", "") + + # certificate nodes include only the base64 encoded PEM data, without header/footer + cert1_node = key1_node.find( + "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES + ) + assert cert1_node is not None + assert cert1_node.text is not None + assert (cert_data_1 := cert1_node.text.strip()) in current_base64 + + cert2_node = key2_node.find( + "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES + ) + assert cert2_node is not None + assert cert2_node.text is not None + assert (cert_data_2 := cert2_node.text.strip()) in next_base64 + # they should not be the same + assert cert_data_1 != cert_data_2 diff --git a/tests/test_eherkenning_views.py b/tests/test_eherkenning_views.py index ffb8774..7a45122 100644 --- a/tests/test_eherkenning_views.py +++ b/tests/test_eherkenning_views.py @@ -3,7 +3,7 @@ from unittest.mock import patch from django.conf import settings -from django.test import RequestFactory, TestCase, override_settings +from django.test import TestCase, override_settings from django.urls import reverse from django.utils import timezone @@ -14,9 +14,7 @@ from lxml import etree from onelogin.saml2.utils import OneLogin_Saml2_Utils -from digid_eherkenning.choices import AssuranceLevels from digid_eherkenning.models import EherkenningConfiguration -from digid_eherkenning.views import eHerkenningLoginView from .project.models import User from .utils import create_example_artifact, get_saml_element @@ -146,7 +144,8 @@ def setUp(self): config = EherkenningConfiguration.get_solo() - with config.certificate.public_certificate.open("r") as cert_file: + current_cert, _ = config.select_certificates() + with current_cert.public_certificate.open("r") as cert_file: cert = cert_file.read() encrypted_attribute = OneLogin_Saml2_Utils.generate_name_id( diff --git a/tests/test_migrations.py b/tests/test_migrations.py index b10086a..782bc7d 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -7,11 +7,10 @@ import pytest from cryptography.hazmat.primitives.asymmetric import rsa from simple_certmanager.constants import CertificateTypes -from simple_certmanager.models import Certificate from simple_certmanager.test.certificate_generation import key_to_pem from simple_certmanager.utils import load_pem_x509_private_key -from digid_eherkenning.choices import AssuranceLevels +from digid_eherkenning.choices import AssuranceLevels, ConfigTypes @pytest.mark.django_db() @@ -43,7 +42,7 @@ def test_fixing_misconfigured_eherkenning(migrator): @pytest.mark.django_db def test_decrypt_private_keys_with_passphrase( - migrator, encrypted_keypair: tuple[bytes, bytes] + temp_private_root, migrator, encrypted_keypair: tuple[bytes, bytes] ): old_state = migrator.apply_initial_migration( ("digid_eherkenning", "0008_update_loa_fields") @@ -163,6 +162,7 @@ def _decryption_skip_cases_idfn(case): ) @pytest.mark.django_db def test_decryption_migration_robustness( + temp_private_root, migrator, leaf_keypair: tuple[rsa.RSAPrivateKey, bytes], encrypted_keypair: tuple[bytes, bytes], @@ -202,3 +202,64 @@ def test_decryption_migration_robustness( ) except Exception: pytest.fail("Expected migration not to crash") + + +@pytest.mark.parametrize( + "model_name,config_type", + [ + ("DigidConfiguration", ConfigTypes.digid), + ("EherkenningConfiguration", ConfigTypes.eherkenning), + ], +) +def test_move_certificate( + temp_private_root, + migrator, + leaf_keypair, + model_name, + config_type, +): + key, cert_pem = leaf_keypair + key_pem = key_to_pem(key, passphrase="") + old_state = migrator.apply_initial_migration( + ( + "digid_eherkenning", + "0011_configcertificate_configcertificate_uniq_config_cert", + ) + ) + OldConfig = old_state.apps.get_model("digid_eherkenning", model_name) + Certificate = old_state.apps.get_model("simple_certmanager", "Certificate") + certificate = Certificate.objects.create( + label="Test certificate", + type=CertificateTypes.key_pair, + public_certificate=File(BytesIO(cert_pem), name="client_cert.pem"), + private_key=File(BytesIO(key_pem), name="client_key.pem"), + ) + OldConfig.objects.create(certificate=certificate) + + new_state = migrator.apply_tested_migration( + ("digid_eherkenning", "0012_move_config_certificate") + ) + + ConfigCertificate = new_state.apps.get_model( + "digid_eherkenning", "ConfigCertificate" + ) + instance = ConfigCertificate.objects.get(certificate=certificate.pk) + assert instance.config_type == config_type + + +def test_move_certificate_noop(migrator): + migrator.apply_initial_migration( + ( + "digid_eherkenning", + "0011_configcertificate_configcertificate_uniq_config_cert", + ) + ) + # there are no config records on a fresh install + new_state = migrator.apply_tested_migration( + ("digid_eherkenning", "0012_move_config_certificate") + ) + + ConfigCertificate = new_state.apps.get_model( + "digid_eherkenning", "ConfigCertificate" + ) + assert not ConfigCertificate.objects.exists()