diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index b1e4ed5..bee4c9b 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -2,6 +2,19 @@
Changelog
=========
+0.17.0 (DEVELOPMENT)
+====================
+
+**💥⚠️ Breaking changes**
+
+* Removed the ``generate_digid_metadata``, ``generate_eherkenning_metadata`` and
+ ``generate_eherkenning_dienstcatalogus`` management commands. This metadata is
+ available through the admin interface and existing URLs/views.
+
+**Features**
+
+...
+
0.16.0 (2024-07-02)
===================
diff --git a/digid_eherkenning/admin.py b/digid_eherkenning/admin.py
index f7fdf5f..aa7d3d4 100644
--- a/digid_eherkenning/admin.py
+++ b/digid_eherkenning/admin.py
@@ -1,11 +1,16 @@
+from datetime import datetime
+
from django.contrib import admin
+from django.urls import reverse
+from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
from privates.admin import PrivateMediaMixin
from privates.widgets import PrivateFileWidget
from solo.admin import SingletonModelAdmin
-from .models import DigidConfiguration, EherkenningConfiguration
+from .models import ConfigCertificate, DigidConfiguration, EherkenningConfiguration
+from .models.base import BaseConfiguration
class CustomPrivateFileWidget(PrivateFileWidget):
@@ -16,14 +21,39 @@ class CustomPrivateMediaMixin(PrivateMediaMixin):
private_media_file_widget = CustomPrivateFileWidget
-@admin.register(DigidConfiguration)
-class DigidConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin):
- readonly_fields = ("idp_service_entity_id",)
- fieldsets = (
+class BaseAdmin(CustomPrivateMediaMixin, SingletonModelAdmin):
+ readonly_fields = (
+ "link_to_certificates",
+ "idp_service_entity_id",
+ )
+ private_media_fields = ("idp_metadata_file",)
+
+ @admin.display(description=_("certificates"))
+ def link_to_certificates(self, obj: BaseConfiguration) -> str:
+ path = reverse(
+ "admin:digid_eherkenning_configcertificate_changelist",
+ current_app=self.admin_site.name,
+ )
+ config_type = obj._as_config_type()
+ qs = ConfigCertificate.objects.filter(config_type=config_type)
+ url = f"{path}?config_type__exact={config_type}"
+ return format_html(
+ '{label}',
+ url=url,
+ config_type=config_type.value,
+ label=_("Manage ({count})").format(count=qs.count()),
+ )
+
+
+def _fieldset_factory(middle):
+ """
+ Output custom fieldsets (model-specific) between fixed shared field(set)s.
+ """
+ head = [
(
_("X.509 Certificate"),
{
- "fields": ("certificate",),
+ "fields": ("link_to_certificates",),
},
),
(
@@ -50,18 +80,8 @@ class DigidConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin):
),
},
),
- (
- _("Service details"),
- {
- "fields": (
- "service_name",
- "service_description",
- "requested_attributes",
- "attribute_consuming_service_index",
- "slo",
- ),
- },
- ),
+ ]
+ tail = [
(
_("Organization details"),
{
@@ -73,96 +93,103 @@ class DigidConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin):
),
},
),
+ ]
+
+ return tuple(head + list(middle) + tail)
+
+
+@admin.register(DigidConfiguration)
+class DigidConfigurationAdmin(BaseAdmin):
+ fieldsets = _fieldset_factory(
+ [
+ (
+ _("Service details"),
+ {
+ "fields": (
+ "service_name",
+ "service_description",
+ "requested_attributes",
+ "attribute_consuming_service_index",
+ "slo",
+ ),
+ },
+ ),
+ ]
)
change_form_template = "admin/digid_eherkenning/digidconfiguration/change_form.html"
- private_media_fields = ("idp_metadata_file",)
@admin.register(EherkenningConfiguration)
-class EherkenningConfigurationAdmin(CustomPrivateMediaMixin, SingletonModelAdmin):
- readonly_fields = ("idp_service_entity_id",)
- fieldsets = (
- (
- _("X.509 Certificate"),
- {
- "fields": ("certificate",),
- },
- ),
- (
- _("Identity provider"),
- {
- "fields": (
- "metadata_file_source",
- "idp_service_entity_id",
- "idp_metadata_file",
- ),
- },
- ),
- (
- _("SAML configuration"),
- {
- "fields": (
- "entity_id",
- "base_url",
- "artifact_resolve_content_type",
- "want_assertions_signed",
- "want_assertions_encrypted",
- "signature_algorithm",
- "digest_algorithm",
- ),
- },
- ),
- (
- _("Service details"),
- {
- "fields": (
- "service_name",
- "service_description",
- "oin",
- "makelaar_id",
- "privacy_policy",
- "service_language",
- ),
- },
- ),
- (
- _("eHerkenning"),
- {
- "fields": (
- "eh_requested_attributes",
- "eh_attribute_consuming_service_index",
- "eh_service_uuid",
- "eh_service_instance_uuid",
- "eh_loa",
- ),
- },
- ),
- (
- _("eIDAS"),
- {
- "fields": (
- "no_eidas",
- "eidas_requested_attributes",
- "eidas_attribute_consuming_service_index",
- "eidas_service_uuid",
- "eidas_service_instance_uuid",
- "eidas_loa",
- ),
- },
- ),
- (
- _("Organization details"),
- {
- "fields": (
- "technical_contact_person_telephone",
- "technical_contact_person_email",
- "organization_url",
- "organization_name",
- ),
- },
- ),
+class EherkenningConfigurationAdmin(BaseAdmin):
+ fieldsets = _fieldset_factory(
+ [
+ (
+ _("Service details"),
+ {
+ "fields": (
+ "service_name",
+ "service_description",
+ "oin",
+ "makelaar_id",
+ "privacy_policy",
+ "service_language",
+ ),
+ },
+ ),
+ (
+ _("eHerkenning"),
+ {
+ "fields": (
+ "eh_requested_attributes",
+ "eh_attribute_consuming_service_index",
+ "eh_service_uuid",
+ "eh_service_instance_uuid",
+ "eh_loa",
+ ),
+ },
+ ),
+ (
+ _("eIDAS"),
+ {
+ "fields": (
+ "no_eidas",
+ "eidas_requested_attributes",
+ "eidas_attribute_consuming_service_index",
+ "eidas_service_uuid",
+ "eidas_service_instance_uuid",
+ "eidas_loa",
+ ),
+ },
+ ),
+ ]
)
+
change_form_template = (
"admin/digid_eherkenning/eherkenningconfiguration/change_form.html"
)
- private_media_fields = ("idp_metadata_file",)
+
+
+@admin.register(ConfigCertificate)
+class ConfigCertificateAdmin(admin.ModelAdmin):
+ list_display = (
+ "config_type",
+ "certificate",
+ "valid_from",
+ "expiry_date",
+ "is_ready",
+ )
+ list_filter = ("config_type",)
+ search_fields = ("certificate__label",)
+ raw_id_fields = ("certificate",)
+
+ @admin.display(description=_("valid from"))
+ def valid_from(self, obj: ConfigCertificate) -> datetime:
+ return obj.certificate.valid_from
+
+ @admin.display(description=_("expires on"))
+ def expiry_date(self, obj: ConfigCertificate) -> datetime:
+ return obj.certificate.expiry_date
+
+ @admin.display(description=_("valid candidate?"), boolean=True)
+ def is_ready(self, obj: ConfigCertificate) -> bool:
+ return obj.is_ready_for_authn_requests
diff --git a/digid_eherkenning/choices.py b/digid_eherkenning/choices.py
index 60960c1..d2f9dda 100644
--- a/digid_eherkenning/choices.py
+++ b/digid_eherkenning/choices.py
@@ -4,6 +4,15 @@
from onelogin.saml2.constants import OneLogin_Saml2_Constants
+class ConfigTypes(models.TextChoices):
+ """
+ Maps a config type enum to a configuration model.
+ """
+
+ digid = "digid_eherkenning.DigidConfiguration", _("DigiD")
+ eherkenning = "digid_eherkenning.EherkenningConfiguration", _("eHerkenning")
+
+
class SectorType(models.TextChoices):
bsn = "s00000000", "BSN"
sofi = "s00000001", "SOFI"
diff --git a/digid_eherkenning/exceptions.py b/digid_eherkenning/exceptions.py
index f372f00..b04d372 100644
--- a/digid_eherkenning/exceptions.py
+++ b/digid_eherkenning/exceptions.py
@@ -8,3 +8,9 @@ class eHerkenningError(SAML2Error):
class eHerkenningNoRSINError(eHerkenningError):
pass
+
+
+class CertificateProblem(Exception):
+ def __init__(self, msg: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.message = msg
diff --git a/digid_eherkenning/management/commands/_base.py b/digid_eherkenning/management/commands/_base.py
deleted file mode 100644
index 9aa7c14..0000000
--- a/digid_eherkenning/management/commands/_base.py
+++ /dev/null
@@ -1,226 +0,0 @@
-from pathlib import Path
-from typing import Sequence, Type
-
-from django.core.files import File
-from django.core.management.base import BaseCommand
-from django.db import transaction
-
-from simple_certmanager.constants import CertificateTypes
-from simple_certmanager.models import Certificate
-
-from ...models.base import BaseConfiguration
-
-try:
- from argparse import BooleanOptionalAction
-except ImportError:
- from ..utils import BooleanOptionalAction
-
-
-class SamlMetadataBaseCommand(BaseCommand):
- config_model: Type[BaseConfiguration]
- default_certificate_label: str
-
- def add_arguments(self, parser):
- """
- Add arguments that map to configuration model fields.
-
- You can use a different flag, but then ensure the ``dest`` kwarg is specified.
- Options are applied to the specified configuration model instance if a model
- field with the same name as the option exists.
- """
- # check current config to determine if an option is required or not
- config = self._get_config()
- has_private_key = config.certificate and config.certificate.private_key
- has_certificate = config.certificate and config.certificate.public_certificate
-
- parser.add_argument(
- "--want-assertions-encrypted",
- action="store_true",
- help="If True the XML assertions need to be encrypted. Defaults to False",
- )
- parser.add_argument(
- "--no-only-assertions-signed",
- dest="want_assertions_signed",
- action="store_false",
- help=(
- "If True, the XML assertions need to be signed, otherwise the whole "
- "response needs to be signed. Defaults to only assertions signed."
- ),
- )
- parser.add_argument(
- "--key-file",
- required=not has_private_key,
- help=(
- "The filepath to the TLS key. This will be used both by the SOAP "
- "client and for signing the requests."
- ),
- )
- parser.add_argument(
- "--cert-file",
- required=not has_certificate,
- help=(
- "The filepath to the TLS certificate. This will be used both by the "
- "SOAP client and for signing the requests."
- ),
- )
- parser.add_argument(
- "--key-passphrase",
- help="Passphrase for SOAP client",
- default=None,
- )
- parser.add_argument(
- "--signature-algorithm",
- help="Signature algorithm, defaults to RSA_SHA1",
- default="http://www.w3.org/2000/09/xmldsig#rsa-sha1",
- )
- parser.add_argument(
- "--digest-algorithm",
- help="Digest algorithm, defaults to SHA1",
- default="http://www.w3.org/2000/09/xmldsig#sha1",
- )
- parser.add_argument(
- "--entity-id",
- required=not config.entity_id,
- help="Service provider entity ID",
- )
- parser.add_argument(
- "--base-url",
- required=not config.base_url,
- help="Base URL of the application",
- )
- parser.add_argument(
- "--service-name",
- required=not config.service_name,
- help="The name of the service for which DigiD login is required",
- )
- parser.add_argument(
- "--service-description",
- required=not config.service_description,
- help="A description of the service for which DigiD login is required",
- )
- parser.add_argument(
- "--technical-contact-person-telephone",
- help=(
- "Telephone number of the technical person responsible for this DigiD "
- "setup. For it to be used, --technical-contact-person-email should "
- "also be set."
- ),
- )
- parser.add_argument(
- "--technical-contact-person-email",
- help=(
- "Email address of the technical person responsible for this DigiD "
- "setup. For it to be used, --technical-contact-person-telephone "
- "should also be set."
- ),
- )
- parser.add_argument(
- "--organization-name",
- help=(
- "Name of the organisation providing the service for which DigiD login "
- "is setup. For it to be used, also --organization-url should be filled."
- ),
- )
- parser.add_argument(
- "--organization-url",
- help=(
- "URL of the organisation providing the service for which DigiD login "
- "is setup. For it to be used, also --organization-name should be "
- "filled."
- ),
- )
- parser.add_argument(
- "--output-file",
- help=(
- "Name of the file to which to write the metadata. Otherwise will be "
- "printed on stdout"
- ),
- )
- parser.add_argument(
- "--test",
- "--debug",
- action="store_true",
- help="If True the metadata is printed to stdout. Defaults to False",
- )
- parser.add_argument(
- "--save-config",
- action=BooleanOptionalAction,
- required=True,
- help="Save the configuration overrides specified via the command line.",
- )
-
- def get_filename(self) -> str: # pragma:nocover
- raise NotImplementedError
-
- def generate_metadata(self, options: dict) -> bytes: # pragma:nocover
- raise NotImplementedError
-
- def _get_config(self):
- if not hasattr(self, "_config"):
- self._config = self.config_model.get_solo()
- return self._config
-
- def _set_certificate(self, config: BaseConfiguration, options: dict):
- certificate = config.certificate
-
- # no certificate exists yet -> create one
- if certificate is None:
- certificate = Certificate.objects.create(
- label=self.default_certificate_label,
- type=CertificateTypes.key_pair,
- )
- config.certificate = certificate
-
- # enforce that the specified key/certificate are used
- for option, filefield in (
- ("key_file", "private_key"),
- ("cert_file", "public_certificate"),
- ):
- filepath = options[option]
- if not filepath:
- continue
-
- path = Path(filepath)
- with path.open("rb") as infile:
- field_file = getattr(certificate, filefield)
- field_file.save(path.name, File(infile), save=False)
-
- certificate.save()
-
- @transaction.atomic
- def _generate_metadata(self, options: dict) -> bytes:
- valid_field_names = [f.name for f in self.config_model._meta.get_fields()]
- config = self._get_config()
-
- self._set_certificate(config, options)
-
- for key, value in options.items():
- if key not in valid_field_names:
- continue
- # optional, unspecified -> go with the model default or current value
- if value is None:
- continue
- setattr(config, key, value)
-
- config.save()
-
- metadata = self.generate_metadata(options)
-
- transaction.set_rollback(not options["save_config"])
-
- return metadata
-
- def handle(self, *args, **options):
- metadata_content = self._generate_metadata(options)
-
- if options["test"]:
- self.stdout.write(metadata_content.decode("utf-8"))
- return
-
- filename = options["output_file"] or self.get_filename()
- with open(filename, "xb") as metadata_file:
- metadata_file.write(metadata_content)
-
- self.stdout.write(
- self.style.SUCCESS("Metadata file successfully generated: %s" % filename)
- )
diff --git a/digid_eherkenning/management/commands/generate_digid_metadata.py b/digid_eherkenning/management/commands/generate_digid_metadata.py
deleted file mode 100644
index 3353031..0000000
--- a/digid_eherkenning/management/commands/generate_digid_metadata.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from django.utils import timezone
-
-from ...models import DigidConfiguration
-from ...saml2.digid import generate_digid_metadata
-from ._base import SamlMetadataBaseCommand
-
-try:
- from argparse import BooleanOptionalAction
-except ImportError:
- from ..utils import BooleanOptionalAction
-
-
-class Command(SamlMetadataBaseCommand):
- help = "Create the DigiD metadata file"
- config_model = DigidConfiguration
- default_certificate_label = "DigiD"
-
- def add_arguments(self, parser):
- super().add_arguments(parser)
-
- config: DigidConfiguration = self._get_config()
-
- parser.add_argument(
- "--slo",
- default=config.slo,
- action=BooleanOptionalAction,
- help="If '--slo' is present, Single Logout is supported. To turn it off use '--no-slo'",
- )
- parser.add_argument(
- "--attribute-consuming-service-index",
- type=str,
- help="Attribute consuming service index, defaults to 1",
- default="1",
- )
-
- def get_filename(self):
- date_string = timezone.now().date().isoformat()
- return f"digid-metadata-{date_string}.xml"
-
- def generate_metadata(self, options):
- return generate_digid_metadata()
diff --git a/digid_eherkenning/management/commands/generate_eherkenning_dienstcatalogus.py b/digid_eherkenning/management/commands/generate_eherkenning_dienstcatalogus.py
deleted file mode 100644
index 60a7a29..0000000
--- a/digid_eherkenning/management/commands/generate_eherkenning_dienstcatalogus.py
+++ /dev/null
@@ -1,63 +0,0 @@
-from django.utils import timezone
-
-from ...models import EherkenningConfiguration
-from ...saml2.eherkenning import generate_dienst_catalogus_metadata
-from .generate_eherkenning_metadata import Command as EherkenningCommand
-
-
-def _remove_action_by_dest(parser, dest: str):
- for action in parser._actions:
- if action.dest != dest:
- continue
- parser._remove_action(action)
- break
-
- for action in parser._action_groups:
- for group_action in action._group_actions:
- if group_action.dest != dest:
- continue
- action._group_actions.remove(group_action)
- return
-
-
-class Command(EherkenningCommand):
- help = "Create the eHerkenning dienstcatalogus file"
-
- def add_arguments(self, parser):
- super().add_arguments(parser)
-
- # delete arguments that we don't use
- dests_to_delete = [
- "want_assertions_encrypted",
- "want_assertions_signed",
- "technical_contact_person_telephone",
- "technical_contact_person_email",
- "organization_url",
- ]
- # remove actions not relevant for this command, but still re-use the bulk
- # from the eherkenning metadata generation command
- for dest in dests_to_delete:
- _remove_action_by_dest(parser, dest)
-
- config: EherkenningConfiguration = self._get_config()
-
- parser.add_argument(
- "--privacy-policy",
- required=not config.privacy_policy,
- help=(
- "The URL where the privacy policy from the organisation providing the "
- "service can be found."
- ),
- )
- parser.add_argument(
- "--makelaar-id",
- required=not config.makelaar_id,
- help="OIN of the broker used to set up eHerkenning/eIDAS.",
- )
-
- def get_filename(self):
- date_string = timezone.now().date().isoformat()
- return f"eherkenning-dienstcatalogus-{date_string}.xml"
-
- def generate_metadata(self, options):
- return generate_dienst_catalogus_metadata()
diff --git a/digid_eherkenning/management/commands/generate_eherkenning_metadata.py b/digid_eherkenning/management/commands/generate_eherkenning_metadata.py
deleted file mode 100644
index 9995ed7..0000000
--- a/digid_eherkenning/management/commands/generate_eherkenning_metadata.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from django.utils import timezone
-
-from ...models import EherkenningConfiguration
-from ...saml2.eherkenning import generate_eherkenning_metadata
-from ._base import SamlMetadataBaseCommand
-
-
-class Command(SamlMetadataBaseCommand):
- help = "Create the eHerkenning metadata file"
- config_model = EherkenningConfiguration
- default_certificate_label = "eHerkenning/eIDAS"
-
- def add_arguments(self, parser):
- super().add_arguments(parser)
-
- config: EherkenningConfiguration = self._get_config()
-
- parser.add_argument(
- "--loa",
- help="Level of Assurance (LoA) to use for all the services.",
- default="urn:etoegang:core:assurance-class:loa3",
- )
- parser.add_argument(
- "--eh-attribute-consuming-service-index",
- help="Attribute consuming service index for the eHerkenning service, defaults to 9052",
- default="9052",
- )
- parser.add_argument(
- "--eidas-attribute-consuming-service-index",
- help="Attribute consuming service index for the eHerkenning service, defaults to 9053",
- default="9053",
- )
- parser.add_argument(
- "--oin",
- required=not config.oin,
- default=config.oin,
- help="The OIN of the company providing the service.",
- )
- parser.add_argument(
- "--no-eidas",
- action="store_true",
- help="If True, then the service catalogue will contain only the eHerkenning service. Defaults to False.",
- default=False,
- )
-
- def get_filename(self):
- date_string = timezone.now().date().isoformat()
- return f"eherkenning-metadata-{date_string}.xml"
-
- def generate_metadata(self, options):
- return generate_eherkenning_metadata()
diff --git a/digid_eherkenning/management/commands/update_stored_metadata.py b/digid_eherkenning/management/commands/update_stored_metadata.py
index 78f3831..4c09518 100644
--- a/digid_eherkenning/management/commands/update_stored_metadata.py
+++ b/digid_eherkenning/management/commands/update_stored_metadata.py
@@ -1,7 +1,11 @@
from django.core.management import BaseCommand
-from digid_eherkenning.models.digid import DigidConfiguration
-from digid_eherkenning.models.eherkenning import EherkenningConfiguration
+from ...models import DigidConfiguration, EherkenningConfiguration
+
+MODEL_MAP = {
+ "digid": DigidConfiguration,
+ "eherkenning": EherkenningConfiguration,
+}
class Command(BaseCommand):
@@ -11,15 +15,13 @@ def add_arguments(self, parser):
parser.add_argument(
"config_model",
type=str,
- choices=["digid", "eherkenning"],
+ choices=list(MODEL_MAP.keys()),
help="Update the DigiD or Eherkenning configuration metadata.",
)
def handle(self, **options):
- if options["config_model"] == "digid":
- config = DigidConfiguration.get_solo()
- elif options["config_model"] == "eherkenning":
- config = EherkenningConfiguration.get_solo()
+ config_model = MODEL_MAP[options["config_model"]]
+ config = config_model.get_solo()
if config.metadata_file_source:
config.save(force_metadata_update=True)
diff --git a/digid_eherkenning/migrations/0011_configcertificate_configcertificate_uniq_config_cert.py b/digid_eherkenning/migrations/0011_configcertificate_configcertificate_uniq_config_cert.py
new file mode 100644
index 0000000..932e468
--- /dev/null
+++ b/digid_eherkenning/migrations/0011_configcertificate_configcertificate_uniq_config_cert.py
@@ -0,0 +1,65 @@
+# Generated by Django 4.2.13 on 2024-07-19 10:53
+
+from django.db import migrations, models
+import django.db.models.deletion
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("simple_certmanager", "0002_alter_certificate_private_key_and_more"),
+ ("digid_eherkenning", "0010_remove_digidconfiguration_key_passphrase_and_more"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="ConfigCertificate",
+ fields=[
+ (
+ "id",
+ models.AutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ (
+ "config_type",
+ models.CharField(
+ choices=[
+ ("digid_eherkenning.DigidConfiguration", "DigiD"),
+ (
+ "digid_eherkenning.EherkenningConfiguration",
+ "eHerkenning",
+ ),
+ ],
+ max_length=100,
+ verbose_name="config type",
+ ),
+ ),
+ (
+ "certificate",
+ models.ForeignKey(
+ help_text="Certificate that may be used by the specified configuration. The best matching candidate will automatically be selected by the configuration.",
+ limit_choices_to={"type": "key_pair"},
+ on_delete=django.db.models.deletion.PROTECT,
+ to="simple_certmanager.certificate",
+ verbose_name="certificate",
+ ),
+ ),
+ ],
+ options={
+ "verbose_name": "DigiD/eHerkenning certificate",
+ "verbose_name_plural": "DigiD/eHerkenning certificates",
+ },
+ ),
+ migrations.AddConstraint(
+ model_name="configcertificate",
+ constraint=models.UniqueConstraint(
+ fields=("config_type", "certificate"),
+ name="uniq_config_cert",
+ violation_error_message="This configuration and certificate combination already exists.",
+ ),
+ ),
+ ]
diff --git a/digid_eherkenning/migrations/0012_move_config_certificate.py b/digid_eherkenning/migrations/0012_move_config_certificate.py
new file mode 100644
index 0000000..3e1231e
--- /dev/null
+++ b/digid_eherkenning/migrations/0012_move_config_certificate.py
@@ -0,0 +1,47 @@
+# Generated by Django 4.2.13 on 2024-07-19 12:34
+
+from django.db import migrations
+
+from ..choices import ConfigTypes
+
+
+def move_certificates(apps, _):
+ DigidConfiguration = apps.get_model("digid_eherkenning", "DigidConfiguration")
+ EherkenningConfiguration = apps.get_model(
+ "digid_eherkenning", "EherkenningConfiguration"
+ )
+ ConfigCertificate = apps.get_model("digid_eherkenning", "ConfigCertificate")
+
+ for model in (DigidConfiguration, EherkenningConfiguration):
+ config = model.objects.first()
+ if config is None or not (cert := config.certificate):
+ continue
+ ConfigCertificate.objects.get_or_create(
+ certificate=cert,
+ config_type=ConfigTypes(
+ f"{config._meta.app_label}.{config._meta.object_name}"
+ ),
+ )
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ (
+ "digid_eherkenning",
+ "0011_configcertificate_configcertificate_uniq_config_cert",
+ ),
+ ]
+
+ operations = [
+ # reverse migration is ambiguous, if needed, you can easily use the UI
+ migrations.RunPython(move_certificates, migrations.RunPython.noop),
+ migrations.RemoveField(
+ model_name="digidconfiguration",
+ name="certificate",
+ ),
+ migrations.RemoveField(
+ model_name="eherkenningconfiguration",
+ name="certificate",
+ ),
+ ]
diff --git a/digid_eherkenning/models/__init__.py b/digid_eherkenning/models/__init__.py
index 31dfd85..b46fd90 100644
--- a/digid_eherkenning/models/__init__.py
+++ b/digid_eherkenning/models/__init__.py
@@ -1,4 +1,9 @@
+from .certificates import ConfigCertificate
from .digid import DigidConfiguration
from .eherkenning import EherkenningConfiguration
-__all__ = ["DigidConfiguration", "EherkenningConfiguration"]
+__all__ = [
+ "DigidConfiguration",
+ "EherkenningConfiguration",
+ "ConfigCertificate",
+]
diff --git a/digid_eherkenning/models/base.py b/digid_eherkenning/models/base.py
index 75bc4c3..ddda5ca 100644
--- a/digid_eherkenning/models/base.py
+++ b/digid_eherkenning/models/base.py
@@ -10,26 +10,17 @@
from simple_certmanager.models import Certificate
from solo.models import SingletonModel
-from ..choices import DigestAlgorithms, SignatureAlgorithms, XMLContentTypes
-
-
-class ConfigurationManager(models.Manager):
- def get_queryset(self):
- qs = super().get_queryset()
- return qs.select_related("certificate")
+from ..choices import (
+ ConfigTypes,
+ DigestAlgorithms,
+ SignatureAlgorithms,
+ XMLContentTypes,
+)
+from ..exceptions import CertificateProblem
+from .certificates import ConfigCertificate
class BaseConfiguration(SingletonModel):
- certificate = models.ForeignKey(
- Certificate,
- null=True,
- on_delete=models.PROTECT,
- verbose_name=_("key pair"),
- help_text=_(
- "The private key and public certificate pair to use during the "
- "authentication flow."
- ),
- )
idp_metadata_file = PrivateMediaFileField(
_("identity provider metadata"),
blank=True,
@@ -167,8 +158,6 @@ class BaseConfiguration(SingletonModel):
max_length=100,
)
- objects = ConfigurationManager()
-
class Meta:
abstract = True
@@ -240,6 +229,34 @@ def save(self, *args, **kwargs):
super().save(*args, **kwargs)
def clean(self):
- if not self.certificate:
- raise ValidationError(_("You must select a certificate"))
super().clean()
+
+ # require that a certificate is configured
+ if not ConfigCertificate.objects.for_config(self).exists():
+ raise ValidationError(
+ _(
+ "You must prepare at least one certificate for the {verbose_name}."
+ ).format(verbose_name=self._meta.verbose_name)
+ )
+
+ @classmethod
+ def _as_config_type(cls) -> ConfigTypes:
+ opts = cls._meta
+ return ConfigTypes(f"{opts.app_label}.{opts.object_name}")
+
+ def select_certificates(self) -> tuple[Certificate, Certificate | None]:
+ try:
+ current_cert, next_cert = ConfigCertificate.objects.for_config(
+ self
+ ).select_certificates()
+ except ConfigCertificate.DoesNotExist as exc:
+ raise CertificateProblem(
+ "No (valid) certificate configured. The configuration needs a "
+ "certificate with private key and public certificate."
+ ) from exc
+ else:
+ # type checker shanigans, mostly
+ assert isinstance(current_cert, Certificate)
+ assert next_cert is None or isinstance(next_cert, Certificate)
+
+ return current_cert, next_cert
diff --git a/digid_eherkenning/models/certificates.py b/digid_eherkenning/models/certificates.py
new file mode 100644
index 0000000..af6df00
--- /dev/null
+++ b/digid_eherkenning/models/certificates.py
@@ -0,0 +1,201 @@
+"""
+Functionality to relate one or more certificates to a SAMLv2 configuration.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, TypeAlias
+
+from django.db import models
+from django.utils import timezone
+from django.utils.translation import gettext_lazy as _
+
+from simple_certmanager.constants import CertificateTypes
+from simple_certmanager.models import Certificate
+
+from ..choices import ConfigTypes
+
+if TYPE_CHECKING:
+ from .digid import DigidConfiguration
+ from .eherkenning import EherkenningConfiguration
+
+logger = logging.getLogger(__name__)
+
+_AnyDigiD: TypeAlias = "type[DigidConfiguration] | DigidConfiguration"
+_AnyEH: TypeAlias = "type[EherkenningConfiguration] | EherkenningConfiguration"
+
+
+class ConfigCertificateQuerySet(models.QuerySet["ConfigCertificate"]):
+ def for_config(self, config: _AnyDigiD | _AnyEH):
+ config_type = config._as_config_type()
+ return self.filter(config_type=config_type)
+
+ def select_certificates(self) -> tuple[Certificate, Certificate | None]:
+ """
+ Select the best candidates for the current and next certificate.
+
+ The certificates are used for signing authentication requests (and metadata)
+ itself, and the possible certificates that perform signing are included in the
+ metadata. For zero-downtime/gradual replacement, a current and next certificate
+ can be provided (this is a limitation in python3-saml).
+
+ We look for the current certificate and the next with the following algorithm:
+
+ * order candidates by valid_from, so we favour existing/the oldest keypairs
+ * order candidates by expiry date, so if they have an identical valid_from, we
+ favour the one that will expiry first (the other one(s) automatically become
+ the next certificate
+ * discard any candidates that do not meet our key pair requirements, ignoring
+ valid_from/until
+
+ To determine the current certificate:
+
+ * discard candidates that are not valid yet
+ * discard candiates that are not valid anymore
+
+ If no candidate matches, we raise a DoesNotExist exception.
+
+ If a candidate is found, we select the next certificate according to:
+
+ * must be valid_from >= current_certificate.valid_from
+ * must not be expired
+ """
+ # XXX: check if this has a big performance impact because we extract the
+ # valid_from/until by loading the certificate files!
+
+ qs = self.filter(certificate__type=CertificateTypes.key_pair).iterator()
+ # first pass - filter out anything that is not usable for SAML flows (
+ # discarding broken/invalid configurations)
+ candidates = [
+ candidate
+ for candidate in qs
+ if candidate._meets_requirements_to_be_used_for_saml()
+ ]
+ # sort them - we now know that we can safely access the valid_from and
+ # expiry_date attributes
+ candidates = sorted(
+ candidates,
+ key=lambda c: (c.certificate.valid_from, c.certificate.expiry_date),
+ )
+
+ # figure out which certificate is our current certificate
+ current_cert: Certificate | None = None
+ next_cert: Certificate | None = None
+
+ # loop only once, so that we are certain next_cert's validity is *after* current
+ # cert.
+ for candidate in candidates:
+ certificate: Certificate = candidate.certificate
+ match (current_cert, next_cert):
+ case (None, None) if candidate.is_ready_for_authn_requests:
+ current_cert = certificate
+ continue # the same candidate cannot both be current and next
+ case (Certificate(), None) if certificate.expiry_date > timezone.now():
+ next_cert = certificate
+ break # we found both current and next
+ else:
+ logger.debug("Could not determine a next certificate")
+
+ if current_cert is None:
+ raise self.model.DoesNotExist(
+ "Could not find a suitable current certificate"
+ )
+
+ return current_cert, next_cert
+
+
+class ConfigCertificateManager(models.Manager.from_queryset(ConfigCertificateQuerySet)):
+ def get_queryset(self):
+ qs = super().get_queryset()
+ return qs.select_related("certificate")
+
+
+class ConfigCertificate(models.Model):
+ """
+ Tie a particular certificate to a configuration model.
+ """
+
+ config_type = models.CharField(
+ _("config type"),
+ max_length=100,
+ choices=ConfigTypes.choices,
+ )
+ certificate = models.ForeignKey(
+ Certificate,
+ on_delete=models.PROTECT,
+ # Careful! This does not give any guarantees, you can select a valid certificate
+ # and then make the certificate instance itself invalid, and end up with a
+ # cert-only configuration.
+ limit_choices_to={"type": CertificateTypes.key_pair},
+ verbose_name=_("certificate"),
+ help_text=_(
+ "Certificate that may be used by the specified configuration. The best "
+ "matching candidate will automatically be selected by the configuration."
+ ),
+ )
+
+ objects = ConfigCertificateManager()
+
+ class Meta:
+ verbose_name = _("DigiD/eHerkenning certificate")
+ verbose_name_plural = _("DigiD/eHerkenning certificates")
+ constraints = [
+ models.UniqueConstraint(
+ name="uniq_config_cert",
+ fields=("config_type", "certificate"),
+ violation_error_message=_(
+ "This configuration and certificate combination already exists."
+ ),
+ ),
+ ]
+
+ def __str__(self):
+ config_type = self.get_config_type_display() # type: ignore
+ _cert = self.certificate if self.certificate_id else None # type: ignore
+ certificate = str(_cert) if _cert else _("(no certificate selected)")
+ return f"{config_type}: {certificate}"
+
+ def _meets_requirements_to_be_used_for_saml(self) -> bool:
+ try:
+ _certificate: Certificate = self.certificate
+ except Certificate.DoesNotExist:
+ return False
+
+ if _certificate.type != CertificateTypes.key_pair:
+ return False
+
+ if not (privkey := _certificate.private_key) or not privkey.storage.exists(
+ privkey.name
+ ):
+ return False
+
+ # Try loading it with cryptography
+ try:
+ _certificate.certificate
+ except (FileNotFoundError, ValueError) as exc:
+ logger.info(
+ "Could not introspect certificate validity",
+ exc_info=exc,
+ extra={"certificate_pk": _certificate.pk},
+ )
+ return False
+
+ return True
+
+ @property
+ def is_ready_for_authn_requests(self) -> bool:
+ """
+ Introspect the certificate to determine if it's a candidate for authn requests.
+ """
+ if not self._meets_requirements_to_be_used_for_saml():
+ return False
+
+ _certificate: Certificate = self.certificate
+ valid_from, expiry_date = _certificate.valid_from, _certificate.expiry_date
+
+ now = timezone.now()
+ if not (valid_from <= now <= expiry_date):
+ return False
+
+ return True
diff --git a/digid_eherkenning/models/digid.py b/digid_eherkenning/models/digid.py
index 53166fc..66eadc9 100644
--- a/digid_eherkenning/models/digid.py
+++ b/digid_eherkenning/models/digid.py
@@ -1,4 +1,3 @@
-from django.core.exceptions import ImproperlyConfigured
from django.db import models
from django.utils.translation import gettext_lazy as _
@@ -45,15 +44,7 @@ def as_dict(self) -> dict:
"""
organization = None
- if (
- not self.certificate
- or not self.certificate.private_key
- or not self.certificate.public_certificate
- ):
- raise ImproperlyConfigured(
- "No (valid) certificate configured. The configuration needs a "
- "certificate with private key and public certificate."
- )
+ current_cert, next_cert = self.select_certificates()
if self.organization_url and self.organization_name:
organization = {
@@ -68,8 +59,9 @@ def as_dict(self) -> dict:
"base_url": self.base_url,
"entity_id": self.entity_id,
"metadata_file": self.idp_metadata_file,
- "key_file": self.certificate.private_key,
- "cert_file": self.certificate.public_certificate,
+ "key_file": current_cert.private_key,
+ "cert_file": current_cert.public_certificate,
+ "next_cert_file": next_cert.public_certificate if next_cert else None,
"service_entity_id": self.idp_service_entity_id,
"attribute_consuming_service_index": self.attribute_consuming_service_index,
"service_name": self.service_name,
diff --git a/digid_eherkenning/models/eherkenning.py b/digid_eherkenning/models/eherkenning.py
index 15c10ef..e67cda1 100644
--- a/digid_eherkenning/models/eherkenning.py
+++ b/digid_eherkenning/models/eherkenning.py
@@ -1,6 +1,5 @@
import uuid
-from django.core.exceptions import ImproperlyConfigured
from django.db import models
from django.utils.translation import gettext_lazy as _
@@ -198,15 +197,7 @@ def as_dict(self) -> EHerkenningConfig:
"""
organization = None
- if (
- not self.certificate
- or not self.certificate.private_key
- or not self.certificate.public_certificate
- ):
- raise ImproperlyConfigured(
- "No (valid) certificate configured. The configuration needs a "
- "certificate with private key and public certificate."
- )
+ current_cert, next_cert = self.select_certificates()
if self.organization_url and self.organization_name:
organization = {
@@ -279,8 +270,9 @@ def as_dict(self) -> EHerkenningConfig:
"base_url": self.base_url,
"entity_id": self.entity_id,
"metadata_file": self.idp_metadata_file,
- "key_file": self.certificate.private_key,
- "cert_file": self.certificate.public_certificate,
+ "key_file": current_cert.private_key,
+ "cert_file": current_cert.public_certificate,
+ "next_cert_file": next_cert.public_certificate if next_cert else None,
"service_entity_id": self.idp_service_entity_id,
"oin": self.oin,
"services": services,
diff --git a/digid_eherkenning/saml2/base.py b/digid_eherkenning/saml2/base.py
index de5d74c..15baa93 100644
--- a/digid_eherkenning/saml2/base.py
+++ b/digid_eherkenning/saml2/base.py
@@ -280,11 +280,23 @@ def create_config_dict(self, conf):
],
},
"NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified",
+ # Used for:
+ # * signing the metadata
+ # * signing authentication requests
"x509cert": certificate,
+ # Used for:
+ # * signing the metadata
+ # * signing authentication requests
"privateKey": privkey,
},
}
+ # Used to provide the next certificate to be used for signing in the
+ # metadata so that the IDP can prepare.
+ if next_cert_file := conf.get("next_cert_file"):
+ with next_cert_file.open("r") as _next_cert_file:
+ setting_dict["sp"]["x509certNew"] = _next_cert_file.read()
+
# check if we need to add the idp
metadata_file = conf["metadata_file"]
if metadata_file:
diff --git a/digid_eherkenning/saml2/eherkenning.py b/digid_eherkenning/saml2/eherkenning.py
index 57f48da..5f5f96a 100644
--- a/digid_eherkenning/saml2/eherkenning.py
+++ b/digid_eherkenning/saml2/eherkenning.py
@@ -1,7 +1,7 @@
import binascii
from base64 import b64encode
from io import BytesIO
-from typing import Union
+from typing import Union, no_type_check
from uuid import uuid4
from django.urls import reverse
@@ -9,7 +9,6 @@
from cryptography.hazmat.primitives import serialization
from cryptography.x509 import load_pem_x509_certificate
-from furl.furl import furl
from lxml.builder import ElementMaker
from lxml.etree import Element, tostring
from onelogin.saml2.settings import OneLogin_Saml2_Settings
@@ -465,35 +464,17 @@ def conf(self) -> EHerkenningConfig:
self._conf.setdefault("acs_path", reverse("eherkenning:acs"))
return self._conf
+ @no_type_check # my editor has more red than the red wedding in GOT
def create_config_dict(self, conf: EHerkenningConfig) -> EHerkenningSAMLConfig:
config_dict: EHerkenningSAMLConfig = super().create_config_dict(conf)
+ sp_config = config_dict["sp"]
+
+ # we have multiple services, so delete the config for the "single service" variant
attribute_consuming_services = create_attribute_consuming_services(conf)
- with (
- conf["cert_file"].open("r") as cert_file,
- conf["key_file"].open("r") as key_file,
- ):
- certificate = cert_file.read()
- privkey = key_file.read()
- acs_url = furl(conf["base_url"]) / conf["acs_path"]
- config_dict.update(
- {
- "sp": {
- # Identifier of the SP entity (must be a URI)
- "entityId": conf["entity_id"],
- # Specifies info about where and how the message MUST be
- # returned to the requester, in this case our SP.
- "assertionConsumerService": {
- "url": acs_url.url,
- "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact",
- },
- "attributeConsumingServices": attribute_consuming_services,
- "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified",
- "x509cert": certificate,
- "privateKey": privkey,
- },
- }
- )
+ del sp_config["attributeConsumingService"]
+ sp_config["attributeConsumingServices"] = attribute_consuming_services
+
return config_dict
def create_config(
diff --git a/digid_eherkenning/types.py b/digid_eherkenning/types.py
index c8d12b8..36ef8c8 100644
--- a/digid_eherkenning/types.py
+++ b/digid_eherkenning/types.py
@@ -1,6 +1,8 @@
from pathlib import Path
from typing import Optional, TypedDict, Union
+from django.db.models.fields.files import FieldFile
+
class ServiceConfig(TypedDict):
service_uuid: str
@@ -25,8 +27,8 @@ class EHerkenningConfig(TypedDict):
acs_path: str
entity_id: str
metadata_file: str
- cert_file: Path
- key_file: Path
+ cert_file: Path | FieldFile
+ key_file: Path | FieldFile
service_entity_id: str
oin: str
services: list[ServiceConfig]
diff --git a/pyproject.toml b/pyproject.toml
index ee015c5..59e0bff 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -30,7 +30,7 @@ dependencies = [
"cryptography>=40.0.0",
"django>=4.2.0",
"django-sessionprofile",
- "django-simple-certmanager>=2.2.0",
+ "django-simple-certmanager>=2.3.0",
"django-solo",
"lxml>=4.7.1",
"furl",
diff --git a/tests/conftest.py b/tests/conftest.py
index 32c0bfc..3964af4 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,13 +1,21 @@
+from io import BytesIO
from pathlib import Path
from django.core.files import File
import pytest
import responses
+from cryptography.hazmat.primitives.asymmetric import rsa
from simple_certmanager.constants import CertificateTypes
from simple_certmanager.models import Certificate
+from simple_certmanager.test.certificate_generation import key_to_pem
-from digid_eherkenning.models import DigidConfiguration, EherkenningConfiguration
+from digid_eherkenning.choices import ConfigTypes
+from digid_eherkenning.models import (
+ ConfigCertificate,
+ DigidConfiguration,
+ EherkenningConfiguration,
+)
BASE_DIR = Path(__file__).parent.resolve()
@@ -72,7 +80,7 @@ def digid_certificate(temp_private_root) -> Certificate:
DIGID_TEST_KEY_FILE.open("rb") as privkey,
DIGID_TEST_CERTIFICATE_FILE.open("rb") as cert,
):
- certificate, created = Certificate.objects.get_or_create(
+ certificate, _ = Certificate.objects.get_or_create(
label="DigiD Tests",
defaults={"type": CertificateTypes.key_pair},
)
@@ -84,8 +92,11 @@ def digid_certificate(temp_private_root) -> Certificate:
@pytest.fixture
def digid_config_defaults(digid_certificate, temp_private_root):
config = DigidConfiguration.get_solo()
- if config.certificate != digid_certificate:
- config.certificate = digid_certificate
+ # set up certificate
+ ConfigCertificate.objects.filter(config_type=ConfigTypes.digid).delete()
+ ConfigCertificate.objects.create(
+ config_type=ConfigTypes.digid, certificate=digid_certificate
+ )
with DIGID_TEST_METADATA_FILE.open("rb") as metadata_file:
config.idp_metadata_file.save("metadata", File(metadata_file), save=False)
config.save()
@@ -109,7 +120,7 @@ def eherkenning_certificate(temp_private_root) -> Certificate:
EHERKENNING_TEST_KEY_FILE.open("rb") as privkey,
EHERKENNING_TEST_CERTIFICATE_FILE.open("rb") as cert,
):
- certificate, created = Certificate.objects.get_or_create(
+ certificate, _ = Certificate.objects.get_or_create(
label="eHerkenning Tests",
defaults={"type": CertificateTypes.key_pair},
)
@@ -121,8 +132,10 @@ def eherkenning_certificate(temp_private_root) -> Certificate:
@pytest.fixture
def eherkenning_config_defaults(eherkenning_certificate):
config = EherkenningConfiguration.get_solo()
- if config.certificate != eherkenning_certificate:
- config.certificate = eherkenning_certificate
+ ConfigCertificate.objects.filter(config_type=ConfigTypes.eherkenning).delete()
+ ConfigCertificate.objects.create(
+ config_type=ConfigTypes.eherkenning, certificate=eherkenning_certificate
+ )
with EHERKENNING_TEST_METADATA_FILE.open("rb") as metadata_file:
config.idp_metadata_file.save("metadata", File(metadata_file), save=False)
config.save()
@@ -144,3 +157,19 @@ def eherkenning_config(eherkenning_config_defaults):
def mocked_responses():
with responses.RequestsMock() as rsps:
yield rsps
+
+
+@pytest.fixture
+def next_certificate(leaf_keypair: tuple[rsa.RSAPrivateKey, bytes]) -> Certificate:
+ """
+ Generate a key + certificate pair valid from timezone.now().
+ """
+ key, cert_pem = leaf_keypair
+ key_pem = key_to_pem(key)
+ certificate = Certificate.objects.create(
+ label="Next certificate",
+ type=CertificateTypes.key_pair,
+ public_certificate=File(BytesIO(cert_pem), name="public_certificate.pem"),
+ private_key=File(BytesIO(key_pem), name="private_key.pem"),
+ )
+ return certificate
diff --git a/tests/test_admin.py b/tests/test_admin.py
new file mode 100644
index 0000000..6f8b456
--- /dev/null
+++ b/tests/test_admin.py
@@ -0,0 +1,36 @@
+from django.test import Client
+from django.urls import reverse
+from django.utils.translation import gettext as _
+
+import pytest
+from pytest_django.asserts import assertContains
+
+from digid_eherkenning.models import DigidConfiguration, EherkenningConfiguration
+
+
+@pytest.mark.django_db
+def test_digid_configuration_admin_certificates_link(
+ admin_client: Client,
+ digid_config: DigidConfiguration,
+):
+ url = reverse("admin:digid_eherkenning_digidconfiguration_change", args=(1,))
+
+ response = admin_client.get(url)
+
+ assert response.status_code == 200
+ assertContains(response, _("certificates"))
+ assertContains(response, _("Manage ({count})").format(count=1))
+
+
+@pytest.mark.django_db
+def test_eherkenning_configuration_admin_certificates_link(
+ admin_client: Client,
+ eherkenning_config: EherkenningConfiguration,
+):
+ url = reverse("admin:digid_eherkenning_eherkenningconfiguration_change", args=(1,))
+
+ response = admin_client.get(url)
+
+ assert response.status_code == 200
+ assertContains(response, _("certificates"))
+ assertContains(response, _("Manage ({count})").format(count=1))
diff --git a/tests/test_certificate_admin.py b/tests/test_certificate_admin.py
new file mode 100644
index 0000000..c9c9aea
--- /dev/null
+++ b/tests/test_certificate_admin.py
@@ -0,0 +1,22 @@
+from django.test import Client
+from django.urls import reverse
+
+from pytest_django.asserts import assertContains
+
+from digid_eherkenning.choices import ConfigTypes
+from digid_eherkenning.models import ConfigCertificate
+
+
+def test_admin_changelist(admin_client: Client, digid_certificate):
+ url = reverse("admin:digid_eherkenning_configcertificate_changelist")
+ digid_certificate.label = "DigiD certificate"
+ digid_certificate.save()
+ ConfigCertificate.objects.create(
+ config_type=ConfigTypes.digid,
+ certificate=digid_certificate,
+ )
+
+ response = admin_client.get(url)
+
+ assert response.status_code == 200
+ assertContains(response, "DigiD certificate")
diff --git a/tests/test_certificate_models.py b/tests/test_certificate_models.py
new file mode 100644
index 0000000..3bb226d
--- /dev/null
+++ b/tests/test_certificate_models.py
@@ -0,0 +1,296 @@
+from datetime import datetime, timedelta
+from io import BytesIO
+
+from django.core.files import File
+from django.utils import timezone
+
+import pytest
+from cryptography import x509
+from freezegun import freeze_time
+from simple_certmanager.constants import CertificateTypes
+from simple_certmanager.models import Certificate
+from simple_certmanager.test.certificate_generation import (
+ cert_to_pem,
+ gen_key,
+ key_to_pem,
+ mkcert,
+)
+
+from digid_eherkenning.choices import ConfigTypes
+from digid_eherkenning.exceptions import CertificateProblem
+from digid_eherkenning.models import ConfigCertificate
+from digid_eherkenning.models.digid import DigidConfiguration
+from digid_eherkenning.models.eherkenning import EherkenningConfiguration
+
+pytestmark = [pytest.mark.django_db]
+
+
+def test_valid_certificate(temp_private_root, digid_certificate):
+ # note that this test will start failing once the certificates expire IRL (!)
+ config_certificate = ConfigCertificate(
+ config_type=ConfigTypes.digid,
+ certificate=digid_certificate,
+ )
+
+ assert config_certificate.is_ready_for_authn_requests
+
+
+@freeze_time("2099-01-01")
+def test_expired_certificate(temp_private_root, digid_certificate):
+ config_certificate = ConfigCertificate(
+ config_type=ConfigTypes.digid,
+ certificate=digid_certificate,
+ )
+
+ assert not config_certificate.is_ready_for_authn_requests
+
+
+@freeze_time("1700-01-01")
+def test_certificate_not_valid_yet(temp_private_root, digid_certificate):
+ config_certificate = ConfigCertificate(
+ config_type=ConfigTypes.digid,
+ certificate=digid_certificate,
+ )
+
+ assert not config_certificate.is_ready_for_authn_requests
+
+
+def test_certificate_file_missing():
+ # can happen if the infrastructure has an oopsie...
+ certificate = Certificate.objects.create(
+ type=CertificateTypes.key_pair,
+ public_certificate="bad/filepath/cert.pem",
+ )
+ assert certificate.public_certificate.path
+ config_certificate = ConfigCertificate(
+ config_type=ConfigTypes.digid,
+ certificate=certificate,
+ )
+
+ assert not config_certificate.is_ready_for_authn_requests
+
+
+def test_instance_without_certificate_provided():
+ config_certificate = ConfigCertificate(config_type=ConfigTypes.digid)
+
+ assert not config_certificate.is_ready_for_authn_requests
+
+
+def test_certificate_wrong_type(temp_private_root, digid_certificate):
+ digid_certificate.type = CertificateTypes.cert_only
+ digid_certificate.save()
+ config_certificate = ConfigCertificate(
+ config_type=ConfigTypes.digid,
+ certificate=digid_certificate,
+ )
+
+ assert not config_certificate.is_ready_for_authn_requests
+
+
+@pytest.mark.parametrize("path", ("", "missing/dir/bad-key-path.pem"))
+def test_private_key_missing(temp_private_root, digid_certificate, path):
+ digid_certificate.private_key = path
+ digid_certificate.save()
+ config_certificate = ConfigCertificate(
+ config_type=ConfigTypes.digid,
+ certificate=digid_certificate,
+ )
+
+ assert not config_certificate.is_ready_for_authn_requests
+
+
+def test_string_representation(settings, digid_certificate):
+ settings.LANGUAGE_CODE = "en"
+ digid_certificate.label = "SAML"
+ cc1 = ConfigCertificate(
+ config_type=ConfigTypes.digid, certificate=digid_certificate
+ )
+ assert str(cc1) == "DigiD: SAML"
+
+ cc2 = ConfigCertificate(config_type=ConfigTypes.eherkenning, certificate=None)
+ assert str(cc2) == "eHerkenning: (no certificate selected)"
+
+
+# Helpers for multiple certificates - can't call fixtures multiple times to get
+# different outcomes.
+
+
+def _generate_config_certificate(
+ request: pytest.FixtureRequest,
+ config_type: ConfigTypes,
+ valid_from: datetime,
+) -> Certificate:
+ request.getfixturevalue("temp_private_root")
+ subject = x509.Name(
+ [
+ x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "NL"),
+ x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "Some-State"),
+ x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "OpenGem"),
+ x509.NameAttribute(x509.NameOID.COMMON_NAME, "widgits.example.org"),
+ ]
+ )
+ root_cert = request.getfixturevalue("root_cert")
+ root_key = request.getfixturevalue("root_key")
+
+ key = gen_key()
+ # hack to work around the hardcoded timezone.now() usage in the mkcert helper
+ assert valid_from.tzinfo is not None, "Thou shall not use naive datetimes"
+ with freeze_time(valid_from):
+ cert = mkcert(
+ subject=subject,
+ subject_key=key,
+ issuer=root_cert,
+ issuer_key=root_key,
+ can_issue=False,
+ )
+
+ cert_pem, key_pem = cert_to_pem(cert), key_to_pem(key)
+ certificate = Certificate.objects.create(
+ label=f"Certificate, {valid_from.isoformat()}",
+ type=CertificateTypes.key_pair,
+ public_certificate=File(BytesIO(cert_pem), name="public_certificate.pem"),
+ private_key=File(BytesIO(key_pem), name="private_key.pem"),
+ )
+ ConfigCertificate.objects.create(config_type=config_type, certificate=certificate)
+ return certificate
+
+
+@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration))
+def test_certificate_selection_picks_correct(
+ request: pytest.FixtureRequest,
+ model: type[DigidConfiguration] | type[EherkenningConfiguration],
+):
+ config = model.get_solo()
+ config_type = config._as_config_type()
+ # expired
+ _generate_config_certificate(
+ request, config_type, valid_from=timezone.now() - timedelta(days=5)
+ )
+ # currently valid
+ _current_cert = _generate_config_certificate(
+ request, config_type, valid_from=timezone.now()
+ )
+ # valid tomorrow
+ _next_cert = _generate_config_certificate(
+ request,
+ config_type,
+ valid_from=timezone.now() + timedelta(days=1),
+ )
+
+ current_cert, next_cert = config.select_certificates()
+
+ assert current_cert == _current_cert
+ assert next_cert == _next_cert
+
+
+@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration))
+def test_certificate_selection_picks_correct_2(
+ request: pytest.FixtureRequest,
+ model: type[DigidConfiguration] | type[EherkenningConfiguration],
+):
+ config = model.get_solo()
+ config_type = config._as_config_type()
+ # expired
+ _generate_config_certificate(
+ request, config_type, valid_from=timezone.now() - timedelta(days=5)
+ )
+ # currently valid
+ _old_current = _generate_config_certificate(
+ request, config_type, valid_from=timezone.now()
+ )
+ # valid tomorrow
+ _new_current = _generate_config_certificate(
+ request,
+ config_type,
+ valid_from=timezone.now() + timedelta(days=1),
+ )
+
+ # "current" has now expired
+ with freeze_time(timezone.now() + timedelta(days=1, hours=1)):
+ assert timezone.now() > _old_current.expiry_date
+ current_cert, next_cert = config.select_certificates()
+
+ assert current_cert == _new_current
+ assert next_cert is None
+
+
+@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration))
+def test_no_current_certificate(
+ request: pytest.FixtureRequest,
+ model: type[DigidConfiguration] | type[EherkenningConfiguration],
+):
+ config = model.get_solo()
+ config_type = config._as_config_type()
+ # expired
+ _generate_config_certificate(
+ request, config_type, valid_from=timezone.now() - timedelta(days=5)
+ )
+
+ with pytest.raises(CertificateProblem):
+ config.select_certificates()
+
+
+@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration))
+def test_skips_invalid_certificates_for_current(
+ request: pytest.FixtureRequest,
+ model: type[DigidConfiguration] | type[EherkenningConfiguration],
+):
+ config = model.get_solo()
+ config_type = config._as_config_type()
+ now = timezone.now()
+
+ # all are currently valid - but we'll introduce problems
+ (c1, c2, c3, c4, c5) = [
+ _generate_config_certificate(request, config_type, valid_from=now)
+ for _ in range(0, 5)
+ ]
+ # must be keypair
+ c1.type = CertificateTypes.cert_only
+ c1.save()
+ c2.private_key = ""
+ c2.save()
+ c3.public_certificate = ""
+ c3.save()
+ c4.public_certificate.storage.delete(c4.public_certificate.name)
+ c5.private_key.storage.delete(c5.public_certificate.name)
+ # this one is okay
+ _current = _generate_config_certificate(request, config_type, valid_from=now)
+
+ current_cert, next_cert = config.select_certificates()
+
+ assert current_cert == _current
+ assert next_cert is None
+
+
+@pytest.mark.parametrize("model", (DigidConfiguration, EherkenningConfiguration))
+def test_skips_invalid_certificates_for_next(
+ request: pytest.FixtureRequest,
+ model: type[DigidConfiguration] | type[EherkenningConfiguration],
+):
+ config = model.get_solo()
+ config_type = config._as_config_type()
+ now = timezone.now()
+
+ # all are currently valid - but we'll introduce problems
+ (c1, c2, c3, c4, c5) = [
+ _generate_config_certificate(
+ request, config_type, valid_from=now + timedelta(days=1)
+ )
+ for _ in range(0, 5)
+ ]
+ # must be keypair
+ c1.type = CertificateTypes.cert_only
+ c1.save()
+ c2.private_key = ""
+ c2.save()
+ c3.public_certificate = ""
+ c3.save()
+ c4.public_certificate.storage.delete(c4.public_certificate.name)
+ c5.private_key.storage.delete(c5.public_certificate.name)
+ # this one is okay
+ _current = _generate_config_certificate(request, config_type, valid_from=now)
+
+ current_cert, next_cert = config.select_certificates()
+
+ assert current_cert == _current
+ assert next_cert is None
diff --git a/tests/test_dienst_catalogus_creation.py b/tests/test_dienst_catalogus_creation.py
index 054b6c7..7e1e7b7 100644
--- a/tests/test_dienst_catalogus_creation.py
+++ b/tests/test_dienst_catalogus_creation.py
@@ -1,6 +1,3 @@
-from io import StringIO
-
-from django.core.management import CommandError, call_command
from django.test import TestCase
import pytest
@@ -13,7 +10,6 @@
generate_dienst_catalogus_metadata,
)
-from .conftest import EHERKENNING_TEST_CERTIFICATE_FILE, EHERKENNING_TEST_KEY_FILE
from .mixins import EherkenningMetadataMixin
NAMESPACES = {
@@ -274,7 +270,7 @@ def test_catalogus_with_requested_attributes_without_purpose_statement(
@pytest.mark.django_db
def test_makelaar_oin_is_configurable(eherkenning_config_defaults, temp_private_root):
config = EherkenningConfiguration.get_solo()
- config.organisation_name = "Example"
+ config.organization_name = "Example"
config.service_name = "Example"
config.oin = "00000000000000000000"
config.makelaar_id = "00000000000000000123"
@@ -679,331 +675,3 @@ def test_no_eidas_service(self):
namespaces=NAMESPACES,
)
self.assertEqual(0, len(classifier_node))
-
-
-@pytest.mark.django_db
-def test_generate_metadata_all_options_specified(temp_private_root):
- stdout = StringIO()
-
- call_command(
- "generate_eherkenning_dienstcatalogus",
- "--no-save-config",
- stdout=stdout,
- key_file=str(EHERKENNING_TEST_KEY_FILE),
- cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE),
- signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
- digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256",
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- organization_name="Test Organisation",
- eh_attribute_consuming_service_index="9050",
- eidas_attribute_consuming_service_index="9051",
- oin="00000001112223330000",
- service_name="Test Service Name",
- service_description="Test Service Description",
- makelaar_id="00000003332221110000",
- privacy_policy="http://test-privacy.nl",
- test=True,
- )
-
- output = stdout.getvalue()
- service_catalogue_node = etree.XML(output.encode("utf-8"))
-
- signature_algorithm_node = service_catalogue_node.find(
- ".//ds:SignatureMethod",
- namespaces=NAMESPACES,
- )
- assert (
- signature_algorithm_node.attrib["Algorithm"]
- == "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
- )
-
- digest_algorithm_node = service_catalogue_node.find(
- ".//ds:DigestMethod",
- namespaces=NAMESPACES,
- )
- assert (
- digest_algorithm_node.attrib["Algorithm"]
- == "http://www.w3.org/2001/04/xmlenc#sha256"
- )
-
- # Service Provider
- service_provider_id_node = service_catalogue_node.find(
- ".//esc:ServiceProviderID",
- namespaces=NAMESPACES,
- )
- assert service_provider_id_node.text == "00000001112223330000"
-
- oganisation_display_node = service_catalogue_node.find(
- ".//esc:OrganizationDisplayName",
- namespaces=NAMESPACES,
- )
- assert oganisation_display_node.text == "Test Organisation"
-
- # Services
- service_definition_nodes = service_catalogue_node.findall(
- ".//esc:ServiceDefinition",
- namespaces=NAMESPACES,
- )
- assert len(service_definition_nodes) == 2
-
- eherkenning_definition_node, eidas_definition_node = service_definition_nodes
-
- # eHerkenning service definition
- uuid_node = eherkenning_definition_node.find(
- ".//esc:ServiceUUID",
- namespaces=NAMESPACES,
- )
- assert uuid_node is not None
-
- service_name_node = eherkenning_definition_node.find(
- ".//esc:ServiceName",
- namespaces=NAMESPACES,
- )
- assert service_name_node.text == "Test Service Name"
-
- service_description_node = eherkenning_definition_node.find(
- ".//esc:ServiceDescription",
- namespaces=NAMESPACES,
- )
- assert service_description_node.text == "Test Service Description"
-
- loa_node = eherkenning_definition_node.find(
- ".//saml:AuthnContextClassRef",
- namespaces=NAMESPACES,
- )
- assert loa_node.text == "urn:etoegang:core:assurance-class:loa3"
-
- makelaar_id_node = eherkenning_definition_node.find(
- ".//esc:HerkenningsmakelaarId",
- namespaces=NAMESPACES,
- )
- assert makelaar_id_node.text == "00000003332221110000"
-
- entity_concerned_nodes = eherkenning_definition_node.findall(
- ".//esc:EntityConcernedTypesAllowed",
- namespaces=NAMESPACES,
- )
- assert len(entity_concerned_nodes) == 3
- assert entity_concerned_nodes[0].attrib["setNumber"] == "1"
- assert entity_concerned_nodes[0].text == "urn:etoegang:1.9:EntityConcernedID:RSIN"
- assert entity_concerned_nodes[1].attrib["setNumber"] == "1"
- assert entity_concerned_nodes[1].text == "urn:etoegang:1.9:EntityConcernedID:KvKnr"
- assert entity_concerned_nodes[2].attrib["setNumber"] == "2"
- assert entity_concerned_nodes[2].text == "urn:etoegang:1.9:EntityConcernedID:KvKnr"
-
- # eIDAS service definition
- uuid_node = eidas_definition_node.find(
- ".//esc:ServiceUUID",
- namespaces=NAMESPACES,
- )
- assert uuid_node is not None
-
- service_name_node = eidas_definition_node.find(
- ".//esc:ServiceName",
- namespaces=NAMESPACES,
- )
- assert service_name_node.text == "Test Service Name (eIDAS)"
-
- service_description_node = eidas_definition_node.find(
- ".//esc:ServiceDescription",
- namespaces=NAMESPACES,
- )
- assert service_description_node.text == "Test Service Description"
-
- loa_node = eidas_definition_node.find(
- ".//saml:AuthnContextClassRef",
- namespaces=NAMESPACES,
- )
- assert loa_node.text == "urn:etoegang:core:assurance-class:loa3"
-
- makelaar_id_node = eidas_definition_node.find(
- ".//esc:HerkenningsmakelaarId",
- namespaces=NAMESPACES,
- )
- assert makelaar_id_node.text == "00000003332221110000"
-
- entity_concerned_nodes = eidas_definition_node.findall(
- ".//esc:EntityConcernedTypesAllowed",
- namespaces=NAMESPACES,
- )
- assert len(entity_concerned_nodes) == 1
- assert entity_concerned_nodes[0].text == "urn:etoegang:1.9:EntityConcernedID:Pseudo"
-
- # Service instances
- service_instance_nodes = service_catalogue_node.findall(
- ".//esc:ServiceInstance",
- namespaces=NAMESPACES,
- )
- assert len(service_instance_nodes) == 2
-
- eherkenning_instance_node, eidas_instance_node = service_instance_nodes
-
- # Service instance eHerkenning
- service_id_node = eherkenning_instance_node.find(
- ".//esc:ServiceID",
- namespaces=NAMESPACES,
- )
- assert service_id_node.text == "urn:etoegang:DV:00000001112223330000:services:9050"
-
- service_url_node = eherkenning_instance_node.find(
- ".//esc:ServiceURL",
- namespaces=NAMESPACES,
- )
- assert service_url_node.text == "http://test-entity.id"
-
- privacy_url_node = eherkenning_instance_node.find(
- ".//esc:PrivacyPolicyURL",
- namespaces=NAMESPACES,
- )
- assert privacy_url_node.text == "http://test-privacy.nl"
-
- makelaar_id_node = eherkenning_instance_node.find(
- ".//esc:HerkenningsmakelaarId",
- namespaces=NAMESPACES,
- )
- assert makelaar_id_node.text == "00000003332221110000"
-
- key_name_node = eherkenning_instance_node.find(
- ".//ds:KeyName",
- namespaces=NAMESPACES,
- )
- assert key_name_node is not None
- certificate_node = eherkenning_instance_node.find(
- ".//ds:X509Certificate",
- namespaces=NAMESPACES,
- )
- assert certificate_node is not None
-
- classifier_node = eherkenning_instance_node.findall(
- ".//esc:Classifier",
- namespaces=NAMESPACES,
- )
- assert len(classifier_node) == 0
-
- # Service instance eIDAS
- service_id_node = eidas_instance_node.find(
- ".//esc:ServiceID",
- namespaces=NAMESPACES,
- )
- assert service_id_node.text == "urn:etoegang:DV:00000001112223330000:services:9051"
-
- service_url_node = eidas_instance_node.find(
- ".//esc:ServiceURL",
- namespaces=NAMESPACES,
- )
- assert service_url_node.text == "http://test-entity.id"
-
- privacy_url_node = eidas_instance_node.find(
- ".//esc:PrivacyPolicyURL",
- namespaces=NAMESPACES,
- )
- assert privacy_url_node.text == "http://test-privacy.nl"
-
- makelaar_id_node = eidas_instance_node.find(
- ".//esc:HerkenningsmakelaarId",
- namespaces=NAMESPACES,
- )
- assert makelaar_id_node.text == "00000003332221110000"
-
- key_name_node = eidas_instance_node.find(
- ".//ds:KeyName",
- namespaces=NAMESPACES,
- )
- assert key_name_node is not None
- certificate_node = eidas_instance_node.find(
- ".//ds:X509Certificate",
- namespaces=NAMESPACES,
- )
- assert certificate_node is not None
-
- classifier_node = eidas_instance_node.findall(
- ".//esc:Classifier",
- namespaces=NAMESPACES,
- )
- assert len(classifier_node) == 1
- assert classifier_node[0].text == "eIDAS-inbound"
-
-
-@pytest.mark.django_db
-def test_missing_required_properties():
- with pytest.raises(CommandError):
- call_command("generate_eherkenning_dienstcatalogus")
-
-
-@pytest.mark.django_db
-def test_no_eidas_service(temp_private_root):
- stdout = StringIO()
-
- call_command(
- "generate_eherkenning_dienstcatalogus",
- "--no-save-config",
- stdout=stdout,
- key_file=str(EHERKENNING_TEST_KEY_FILE),
- cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE),
- signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
- digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256",
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- organization_name="Test Organisation",
- eh_attribute_consuming_service_index="9050",
- no_eidas=True,
- oin="00000001112223330000",
- service_name="Test Service Name",
- service_description="Test Service Description",
- makelaar_id="00000003332221110000",
- privacy_policy="http://test-privacy.nl",
- test=True,
- )
-
- output = stdout.getvalue()
- service_catalogue_node = etree.XML(output.encode("utf-8"))
-
- service_instance_nodes = service_catalogue_node.findall(
- ".//esc:ServiceInstance",
- namespaces=NAMESPACES,
- )
- assert len(service_instance_nodes) == 1
-
- eherkenning_instance_node = service_instance_nodes[0]
- # Service instance eHerkenning
- service_id_node = eherkenning_instance_node.find(
- ".//esc:ServiceID",
- namespaces=NAMESPACES,
- )
- assert service_id_node.text == "urn:etoegang:DV:00000001112223330000:services:9050"
-
- service_url_node = eherkenning_instance_node.find(
- ".//esc:ServiceURL",
- namespaces=NAMESPACES,
- )
- assert service_url_node.text == "http://test-entity.id"
-
- privacy_url_node = eherkenning_instance_node.find(
- ".//esc:PrivacyPolicyURL",
- namespaces=NAMESPACES,
- )
- assert privacy_url_node.text == "http://test-privacy.nl"
-
- makelaar_id_node = eherkenning_instance_node.find(
- ".//esc:HerkenningsmakelaarId",
- namespaces=NAMESPACES,
- )
- assert makelaar_id_node.text == "00000003332221110000"
-
- key_name_node = eherkenning_instance_node.find(
- ".//ds:KeyName",
- namespaces=NAMESPACES,
- )
- assert key_name_node is not None
- certificate_node = eherkenning_instance_node.find(
- ".//ds:X509Certificate",
- namespaces=NAMESPACES,
- )
- assert certificate_node is not None
-
- classifier_node = eherkenning_instance_node.findall(
- ".//esc:Classifier",
- namespaces=NAMESPACES,
- )
- assert len(classifier_node) == 0
diff --git a/tests/test_digid_metadata.py b/tests/test_digid_metadata.py
index 3e52258..569537a 100644
--- a/tests/test_digid_metadata.py
+++ b/tests/test_digid_metadata.py
@@ -1,16 +1,13 @@
-from io import StringIO
-
-from django.core.management import CommandError, call_command
from django.test import TestCase
import pytest
from lxml import etree
-from privates.test import temp_private_root
+from simple_certmanager.models import Certificate
-from digid_eherkenning.models import DigidConfiguration
+from digid_eherkenning.choices import ConfigTypes
+from digid_eherkenning.models import ConfigCertificate, DigidConfiguration
from digid_eherkenning.saml2.digid import generate_digid_metadata
-from .conftest import DIGID_TEST_CERTIFICATE_FILE, DIGID_TEST_KEY_FILE
from .mixins import DigidMetadataMixin
NAME_SPACES = {
@@ -19,329 +16,6 @@
}
-@temp_private_root()
-class DigidMetadataManagementCommandTests(TestCase):
- def test_generate_metadata_all_options_specified(self):
- stdout = StringIO()
-
- call_command(
- "generate_digid_metadata",
- "--no-save-config",
- "--slo",
- stdout=stdout,
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- key_file=str(DIGID_TEST_KEY_FILE),
- cert_file=str(DIGID_TEST_CERTIFICATE_FILE),
- signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
- digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256",
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- attribute_consuming_service_index="9050",
- service_name="Test Service Name",
- service_description="Test Service Description",
- technical_contact_person_telephone="06123123123",
- technical_contact_person_email="test@test.nl",
- organization_name="Test organisation",
- organization_url="http://test-organisation.nl",
- test=True,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- self.assertEqual(
- "http://test-entity.id", entity_descriptor_node.attrib["entityID"]
- )
-
- sspo_descriptor_node = entity_descriptor_node.find(
- ".//md:SPSSODescriptor",
- namespaces=NAME_SPACES,
- )
-
- self.assertEqual("true", sspo_descriptor_node.attrib["AuthnRequestsSigned"])
- self.assertEqual("true", sspo_descriptor_node.attrib["WantAssertionsSigned"])
-
- certificate_node = entity_descriptor_node.find(
- ".//ds:X509Certificate",
- namespaces=NAME_SPACES,
- )
- self.assertIn(
- "MIIC0DCCAbigAwIBAgIUEjGmfCGa1cOiTi+UKtDQVtySOHUwDQYJKoZIhvcNAQEL",
- certificate_node.text,
- )
-
- signature_algorithm_node = entity_descriptor_node.find(
- ".//ds:SignatureMethod",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(
- "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
- signature_algorithm_node.attrib["Algorithm"],
- )
-
- digest_algorithm_node = entity_descriptor_node.find(
- ".//ds:DigestMethod",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(
- "http://www.w3.org/2001/04/xmlenc#sha256",
- digest_algorithm_node.attrib["Algorithm"],
- )
-
- assertion_consuming_service_node = entity_descriptor_node.find(
- ".//md:AssertionConsumerService",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(
- "http://test-entity.id/digid/acs/",
- assertion_consuming_service_node.attrib["Location"],
- )
-
- attribute_consuming_service_node = entity_descriptor_node.find(
- ".//md:AttributeConsumingService",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("9050", attribute_consuming_service_node.attrib["index"])
-
- service_name_node = entity_descriptor_node.find(
- ".//md:ServiceName",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("Test Service Name", service_name_node.text)
-
- service_description_node = entity_descriptor_node.find(
- ".//md:ServiceDescription",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("Test Service Description", service_description_node.text)
-
- organisation_name_node = entity_descriptor_node.find(
- ".//md:OrganizationName",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("Test organisation", organisation_name_node.text)
-
- organisation_display_node = entity_descriptor_node.find(
- ".//md:OrganizationDisplayName",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("Test organisation", organisation_display_node.text)
-
- organisation_url_node = entity_descriptor_node.find(
- ".//md:OrganizationURL",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("http://test-organisation.nl", organisation_url_node.text)
-
- contact_person_node = entity_descriptor_node.find(
- ".//md:ContactPerson",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("technical", contact_person_node.attrib["contactType"])
-
- contact_email_node = entity_descriptor_node.find(
- ".//md:EmailAddress",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("test@test.nl", contact_email_node.text)
-
- contact_telephone_node = entity_descriptor_node.find(
- ".//md:TelephoneNumber",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("06123123123", contact_telephone_node.text)
-
- slo_nodes = entity_descriptor_node.findall(
- ".//md:SingleLogoutService",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(len(slo_nodes), 2)
- slo_soap, slo_redirect = slo_nodes
- self.assertEqual(
- slo_soap.attrib["Location"], "http://test-entity.id/digid/slo/soap/"
- )
- self.assertEqual(
- slo_soap.attrib["Binding"], "urn:oasis:names:tc:SAML:2.0:bindings:SOAP"
- )
- self.assertEqual(
- slo_redirect.attrib["Location"], "http://test-entity.id/digid/slo/redirect/"
- )
- self.assertEqual(
- slo_redirect.attrib["Binding"],
- "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
- )
-
- def test_missing_required_properties(self):
- expected_error = (
- "Error: the following arguments are required: --key-file, --cert-file, "
- "--entity-id, --base-url, --service-name, --service-description, "
- "--save-config/--no-save-config"
- )
-
- with self.assertRaisesMessage(CommandError, expected_error):
- call_command("generate_digid_metadata")
-
- def test_contact_telephone_no_email(self):
- stdout = StringIO()
-
- call_command(
- "generate_digid_metadata",
- "--no-save-config",
- "--slo",
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- key_file=str(DIGID_TEST_KEY_FILE),
- cert_file=str(DIGID_TEST_CERTIFICATE_FILE),
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- service_name="Test Service Name",
- service_description="Test Service Description",
- technical_contact_person_telephone="06123123123",
- test=True,
- stdout=stdout,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- contact_email_node = entity_descriptor_node.find(
- ".//md:EmailAddress",
- namespaces=NAME_SPACES,
- )
- contact_telephone_node = entity_descriptor_node.find(
- ".//md:TelephoneNumber",
- namespaces=NAME_SPACES,
- )
-
- self.assertIsNone(contact_email_node)
- self.assertIsNone(contact_telephone_node)
-
- def test_organisation_url_no_service(self):
- stdout = StringIO()
-
- call_command(
- "generate_digid_metadata",
- "--no-save-config",
- "--slo",
- stdout=stdout,
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- key_file=str(DIGID_TEST_KEY_FILE),
- cert_file=str(DIGID_TEST_CERTIFICATE_FILE),
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- service_name="Test Service Name",
- service_description="Test Service Description",
- organization_url="http://test-organisation.nl",
- test=True,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- organisation_name_node = entity_descriptor_node.find(
- ".//md:OrganizationName",
- namespaces=NAME_SPACES,
- )
- organisation_display_node = entity_descriptor_node.find(
- ".//md:OrganizationDisplayName",
- namespaces=NAME_SPACES,
- )
- organisation_url_node = entity_descriptor_node.find(
- ".//md:OrganizationURL",
- namespaces=NAME_SPACES,
- )
-
- self.assertIsNone(organisation_name_node)
- self.assertIsNone(organisation_display_node)
- self.assertIsNone(organisation_url_node)
-
- def test_slo_not_supported(self):
- stdout = StringIO()
-
- call_command(
- "generate_digid_metadata",
- "--no-save-config",
- "--no-slo",
- stdout=stdout,
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- key_file=str(DIGID_TEST_KEY_FILE),
- cert_file=str(DIGID_TEST_CERTIFICATE_FILE),
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- service_name="Test Service Name",
- service_description="Test Service Description",
- test=True,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- single_logout_service_node = entity_descriptor_node.find(
- ".//md:SingleLogoutService",
- namespaces=NAME_SPACES,
- )
- self.assertIsNone(single_logout_service_node)
-
- def test_management_command_and_update_config(self):
- stdout = StringIO()
- assert not DigidConfiguration.objects.exists()
-
- call_command(
- "generate_digid_metadata",
- "--save-config",
- "--want-assertions-encrypted",
- "--no-only-assertions-signed",
- ["--attribute-consuming-service-index", "1"],
- key_file=str(DIGID_TEST_KEY_FILE),
- cert_file=str(DIGID_TEST_CERTIFICATE_FILE),
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- service_name="Test Service Name",
- service_description="Test Service Description",
- stdout=stdout,
- test=True,
- )
-
- self.assertTrue(DigidConfiguration.objects.exists())
- config = DigidConfiguration.get_solo()
- self.assertTrue(config.want_assertions_encrypted)
- self.assertFalse(config.want_assertions_signed)
- self.assertEqual(config.service_name, "Test Service Name")
- self.assertEqual(config.service_description, "Test Service Description")
- self.assertEqual(config.attribute_consuming_service_index, "1")
-
- self.assertIsNotNone(config.certificate)
-
- with config.certificate.private_key.open("rb") as privkey:
- with DIGID_TEST_KEY_FILE.open("rb") as source_privkey:
- self.assertEqual(privkey.read(), source_privkey.read())
-
- with config.certificate.public_certificate.open("rb") as cert:
- with DIGID_TEST_CERTIFICATE_FILE.open("rb") as source_cert:
- self.assertEqual(cert.read(), source_cert.read())
-
-
-@pytest.mark.django_db
-def test_properties_in_db_config_not_required(digid_config):
- """
- Assert that required properties already configured don't cause problems.
- """
- digid_config.service_description = "CLI test"
- digid_config.save()
- try:
- call_command(
- "generate_digid_metadata",
- "--no-save-config",
- "--test",
- stdout=StringIO(),
- )
- except CommandError:
- pytest.fail("Database configuration is valid for management commands.")
-
-
@pytest.mark.usefixtures("digid_config", "temp_private_root")
class DigidMetadataGenerationTests(DigidMetadataMixin, TestCase):
def test_generate_metadata_all_options_specified(self):
@@ -540,3 +214,55 @@ def test_slo_not_supported(self):
namespaces=NAME_SPACES,
)
self.assertIsNone(single_logout_service_node)
+
+
+@pytest.mark.django_db
+def test_current_and_next_certificate_in_metadata(
+ temp_private_root,
+ digid_config: DigidConfiguration,
+ digid_certificate: Certificate,
+ next_certificate: Certificate,
+):
+ ConfigCertificate.objects.create(
+ config_type=ConfigTypes.digid,
+ certificate=next_certificate,
+ )
+ assert ConfigCertificate.objects.count() == 2 # expect current and next
+
+ digid_metadata = generate_digid_metadata()
+
+ entity_descriptor_node = etree.XML(digid_metadata)
+
+ metadata_node = entity_descriptor_node.find(
+ "md:SPSSODescriptor", namespaces=NAME_SPACES
+ )
+ assert metadata_node is not None
+ key_nodes = metadata_node.findall("md:KeyDescriptor", namespaces=NAME_SPACES)
+ assert len(key_nodes) == 2 # we expect current + next key
+ key1_node, key2_node = key_nodes
+ assert key1_node.attrib["use"] == "signing"
+ assert key2_node.attrib["use"] == "signing"
+
+ with (
+ digid_certificate.public_certificate.open("r") as _current,
+ next_certificate.public_certificate.open("r") as _next,
+ ):
+ current_base64 = _current.read().replace("\n", "")
+ next_base64 = _next.read().replace("\n", "")
+
+ # certificate nodes include only the base64 encoded PEM data, without header/footer
+ cert1_node = key1_node.find(
+ "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES
+ )
+ assert cert1_node is not None
+ assert cert1_node.text is not None
+ assert (cert_data_1 := cert1_node.text.strip()) in current_base64
+
+ cert2_node = key2_node.find(
+ "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES
+ )
+ assert cert2_node is not None
+ assert cert2_node.text is not None
+ assert (cert_data_2 := cert2_node.text.strip()) in next_base64
+ # they should not be the same
+ assert cert_data_1 != cert_data_2
diff --git a/tests/test_eherkenning_metadata.py b/tests/test_eherkenning_metadata.py
index 00c0f7b..c2fe4d9 100644
--- a/tests/test_eherkenning_metadata.py
+++ b/tests/test_eherkenning_metadata.py
@@ -1,19 +1,16 @@
-from io import StringIO
-
-from django.core.management import CommandError, call_command
from django.test import TestCase
import pytest
from lxml import etree
-from privates.test import temp_private_root
+from simple_certmanager.models import Certificate
-from digid_eherkenning.models import EherkenningConfiguration
+from digid_eherkenning.choices import ConfigTypes
+from digid_eherkenning.models import ConfigCertificate, EherkenningConfiguration
from digid_eherkenning.saml2.eherkenning import (
eHerkenningClient,
generate_eherkenning_metadata,
)
-from .conftest import EHERKENNING_TEST_CERTIFICATE_FILE, EHERKENNING_TEST_KEY_FILE
from .mixins import EherkenningMetadataMixin
NAME_SPACES = {
@@ -22,345 +19,6 @@
}
-@temp_private_root()
-class EHerkenningMetadataManagementCommandTests(TestCase):
- def test_generate_metadata_all_options_specified(self):
- stdout = StringIO()
-
- call_command(
- "generate_eherkenning_metadata",
- "--no-save-config",
- stdout=stdout,
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- key_file=str(EHERKENNING_TEST_KEY_FILE),
- cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE),
- signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
- digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256",
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- eh_attribute_consuming_service_index="9050",
- eidas_attribute_consuming_service_index="9051",
- oin="00000001112223330000",
- service_name="Test Service Name",
- service_description="Test Service Description",
- technical_contact_person_telephone="06123123123",
- technical_contact_person_email="test@test.nl",
- organization_name="Test organisation",
- organization_url="http://test-organisation.nl",
- test=True,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- self.assertEqual(
- "http://test-entity.id", entity_descriptor_node.attrib["entityID"]
- )
-
- sspo_descriptor_node = entity_descriptor_node.find(
- ".//md:SPSSODescriptor",
- namespaces=NAME_SPACES,
- )
-
- self.assertEqual("true", sspo_descriptor_node.attrib["AuthnRequestsSigned"])
- self.assertEqual("true", sspo_descriptor_node.attrib["WantAssertionsSigned"])
-
- certificate_node = entity_descriptor_node.find(
- ".//ds:X509Certificate",
- namespaces=NAME_SPACES,
- )
- self.assertIn(
- "MIIC0DCCAbigAwIBAgIUEjGmfCGa1cOiTi+UKtDQVtySOHUwDQYJKoZIhvcNAQEL",
- certificate_node.text,
- )
-
- signature_algorithm_node = entity_descriptor_node.find(
- ".//ds:SignatureMethod",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(
- "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
- signature_algorithm_node.attrib["Algorithm"],
- )
-
- digest_algorithm_node = entity_descriptor_node.find(
- ".//ds:DigestMethod",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(
- "http://www.w3.org/2001/04/xmlenc#sha256",
- digest_algorithm_node.attrib["Algorithm"],
- )
-
- assertion_consuming_service_node = entity_descriptor_node.find(
- ".//md:AssertionConsumerService",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(
- "http://test-entity.id/eherkenning/acs/",
- assertion_consuming_service_node.attrib["Location"],
- )
-
- attribute_consuming_service_nodes = entity_descriptor_node.findall(
- ".//md:AttributeConsumingService",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(2, len(attribute_consuming_service_nodes))
-
- eh_attribute_consuming_service_node = attribute_consuming_service_nodes[0]
- eidas_attribute_consuming_service_node = attribute_consuming_service_nodes[1]
-
- self.assertEqual(
- "urn:etoegang:DV:00000001112223330000:services:9050",
- eh_attribute_consuming_service_node.find(
- ".//md:RequestedAttribute", namespaces=NAME_SPACES
- ).attrib["Name"],
- )
- self.assertEqual(
- "Test Service Name",
- eh_attribute_consuming_service_node.find(
- ".//md:ServiceName", namespaces=NAME_SPACES
- ).text,
- )
- self.assertEqual(
- "Test Service Description",
- eh_attribute_consuming_service_node.find(
- ".//md:ServiceDescription", namespaces=NAME_SPACES
- ).text,
- )
- self.assertEqual(
- "urn:etoegang:DV:00000001112223330000:services:9051",
- eidas_attribute_consuming_service_node.find(
- ".//md:RequestedAttribute", namespaces=NAME_SPACES
- ).attrib["Name"],
- )
- self.assertEqual(
- "Test Service Name (eIDAS)",
- eidas_attribute_consuming_service_node.find(
- ".//md:ServiceName", namespaces=NAME_SPACES
- ).text,
- )
- self.assertEqual(
- "Test Service Description",
- eidas_attribute_consuming_service_node.find(
- ".//md:ServiceDescription", namespaces=NAME_SPACES
- ).text,
- )
-
- organisation_name_node = entity_descriptor_node.find(
- ".//md:OrganizationName",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("Test organisation", organisation_name_node.text)
-
- organisation_display_node = entity_descriptor_node.find(
- ".//md:OrganizationDisplayName",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("Test organisation", organisation_display_node.text)
-
- organisation_url_node = entity_descriptor_node.find(
- ".//md:OrganizationURL",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("http://test-organisation.nl", organisation_url_node.text)
-
- contact_person_node = entity_descriptor_node.find(
- ".//md:ContactPerson",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("technical", contact_person_node.attrib["contactType"])
-
- contact_email_node = entity_descriptor_node.find(
- ".//md:EmailAddress",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("test@test.nl", contact_email_node.text)
-
- contact_telephone_node = entity_descriptor_node.find(
- ".//md:TelephoneNumber",
- namespaces=NAME_SPACES,
- )
- self.assertEqual("06123123123", contact_telephone_node.text)
-
- def test_missing_required_properties(self):
- with self.assertRaises(CommandError):
- call_command("generate_eherkenning_metadata")
-
- def test_contact_telephone_no_email(self):
- stdout = StringIO()
-
- call_command(
- "generate_eherkenning_metadata",
- "--no-save-config",
- stdout=stdout,
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- key_file=str(EHERKENNING_TEST_KEY_FILE),
- cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE),
- oin="00000001112223330000",
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- service_name="Test Service Name",
- service_description="Test Service Description",
- technical_contact_person_telephone="06123123123",
- test=True,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- contact_email_node = entity_descriptor_node.find(
- ".//md:EmailAddress",
- namespaces=NAME_SPACES,
- )
- contact_telephone_node = entity_descriptor_node.find(
- ".//md:TelephoneNumber",
- namespaces=NAME_SPACES,
- )
-
- self.assertIsNone(contact_email_node)
- self.assertIsNone(contact_telephone_node)
-
- def test_organisation_url_no_service(self):
- stdout = StringIO()
-
- call_command(
- "generate_eherkenning_metadata",
- "--no-save-config",
- stdout=stdout,
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- oin="00000001112223330000",
- key_file=str(EHERKENNING_TEST_KEY_FILE),
- cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE),
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- service_name="Test Service Name",
- service_description="Test Service Description",
- organization_url="http://test-organisation.nl",
- test=True,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- organisation_name_node = entity_descriptor_node.find(
- ".//md:OrganizationName",
- namespaces=NAME_SPACES,
- )
- organisation_display_node = entity_descriptor_node.find(
- ".//md:OrganizationDisplayName",
- namespaces=NAME_SPACES,
- )
- organisation_url_node = entity_descriptor_node.find(
- ".//md:OrganizationURL",
- namespaces=NAME_SPACES,
- )
-
- self.assertIsNone(organisation_name_node)
- self.assertIsNone(organisation_display_node)
- self.assertIsNone(organisation_url_node)
-
- def test_no_eidas_service(self):
- stdout = StringIO()
-
- call_command(
- "generate_eherkenning_metadata",
- "--no-save-config",
- stdout=stdout,
- want_assertions_encrypted=True,
- want_assertions_signed=True,
- key_file=str(EHERKENNING_TEST_KEY_FILE),
- cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE),
- signature_algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
- digest_algorithm="http://www.w3.org/2001/04/xmlenc#sha256",
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- eh_attribute_consuming_service_index="9050",
- oin="00000001112223330000",
- no_eidas=True,
- service_name="Test Service Name",
- service_description="Test Service Description",
- technical_contact_person_telephone="06123123123",
- technical_contact_person_email="test@test.nl",
- organization_name="Test organisation",
- organization_url="http://test-organisation.nl",
- test=True,
- )
-
- output = stdout.getvalue()
- entity_descriptor_node = etree.XML(output.encode("utf-8"))
-
- attribute_consuming_service_nodes = entity_descriptor_node.findall(
- ".//md:AttributeConsumingService",
- namespaces=NAME_SPACES,
- )
- self.assertEqual(1, len(attribute_consuming_service_nodes))
-
- eh_attribute_consuming_service_node = attribute_consuming_service_nodes[0]
-
- self.assertEqual(
- "urn:etoegang:DV:00000001112223330000:services:9050",
- eh_attribute_consuming_service_node.find(
- ".//md:RequestedAttribute", namespaces=NAME_SPACES
- ).attrib["Name"],
- )
- self.assertEqual(
- "Test Service Name",
- eh_attribute_consuming_service_node.find(
- ".//md:ServiceName", namespaces=NAME_SPACES
- ).text,
- )
- self.assertEqual(
- "Test Service Description",
- eh_attribute_consuming_service_node.find(
- ".//md:ServiceDescription", namespaces=NAME_SPACES
- ).text,
- )
-
- def test_management_command_and_update_config(self):
- stdout = StringIO()
- assert not EherkenningConfiguration.objects.exists()
-
- call_command(
- "generate_eherkenning_metadata",
- "--save-config",
- "--want-assertions-encrypted",
- "--no-only-assertions-signed",
- ["--eh-attribute-consuming-service-index", "1"],
- key_file=str(EHERKENNING_TEST_KEY_FILE),
- cert_file=str(EHERKENNING_TEST_CERTIFICATE_FILE),
- entity_id="http://test-entity.id",
- base_url="http://test-entity.id",
- service_name="Test Service Name",
- service_description="Test Service Description",
- oin="01234567890123456789",
- stdout=stdout,
- test=True,
- )
-
- self.assertTrue(EherkenningConfiguration.objects.exists())
- config = EherkenningConfiguration.get_solo()
- self.assertTrue(config.want_assertions_encrypted)
- self.assertFalse(config.want_assertions_signed)
- self.assertEqual(config.oin, "01234567890123456789")
- self.assertEqual(config.service_name, "Test Service Name")
- self.assertEqual(config.service_description, "Test Service Description")
- self.assertEqual(config.eh_attribute_consuming_service_index, "1")
-
- self.assertIsNotNone(config.certificate)
-
- with config.certificate.private_key.open("rb") as privkey:
- with EHERKENNING_TEST_KEY_FILE.open("rb") as source_privkey:
- self.assertEqual(privkey.read(), source_privkey.read())
-
- with config.certificate.public_certificate.open("rb") as cert:
- with EHERKENNING_TEST_CERTIFICATE_FILE.open("rb") as source_cert:
- self.assertEqual(cert.read(), source_cert.read())
-
-
@pytest.mark.usefixtures("eherkenning_config", "temp_private_root")
class EHerkenningClientTests(TestCase):
def test_attribute_consuming_services_with_non_required_requested_attribute(self):
@@ -714,3 +372,55 @@ def test_no_eidas_service(self):
".//md:ServiceDescription", namespaces=NAME_SPACES
).text,
)
+
+
+@pytest.mark.django_db
+def test_current_and_next_certificate_in_metadata(
+ temp_private_root,
+ eherkenning_config: EherkenningConfiguration,
+ eherkenning_certificate: Certificate,
+ next_certificate: Certificate,
+):
+ ConfigCertificate.objects.create(
+ config_type=ConfigTypes.eherkenning,
+ certificate=next_certificate,
+ )
+ assert ConfigCertificate.objects.count() == 2 # expect current and next
+
+ eh_metadata = generate_eherkenning_metadata()
+
+ entity_descriptor_node = etree.XML(eh_metadata)
+
+ metadata_node = entity_descriptor_node.find(
+ "md:SPSSODescriptor", namespaces=NAME_SPACES
+ )
+ assert metadata_node is not None
+ key_nodes = metadata_node.findall("md:KeyDescriptor", namespaces=NAME_SPACES)
+ assert len(key_nodes) == 2 # we expect current + next key
+ key1_node, key2_node = key_nodes
+ assert key1_node.attrib["use"] == "signing"
+ assert key2_node.attrib["use"] == "signing"
+
+ with (
+ eherkenning_certificate.public_certificate.open("r") as _current,
+ next_certificate.public_certificate.open("r") as _next,
+ ):
+ current_base64 = _current.read().replace("\n", "")
+ next_base64 = _next.read().replace("\n", "")
+
+ # certificate nodes include only the base64 encoded PEM data, without header/footer
+ cert1_node = key1_node.find(
+ "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES
+ )
+ assert cert1_node is not None
+ assert cert1_node.text is not None
+ assert (cert_data_1 := cert1_node.text.strip()) in current_base64
+
+ cert2_node = key2_node.find(
+ "ds:KeyInfo/ds:X509Data/ds:X509Certificate", namespaces=NAME_SPACES
+ )
+ assert cert2_node is not None
+ assert cert2_node.text is not None
+ assert (cert_data_2 := cert2_node.text.strip()) in next_base64
+ # they should not be the same
+ assert cert_data_1 != cert_data_2
diff --git a/tests/test_eherkenning_views.py b/tests/test_eherkenning_views.py
index ffb8774..7a45122 100644
--- a/tests/test_eherkenning_views.py
+++ b/tests/test_eherkenning_views.py
@@ -3,7 +3,7 @@
from unittest.mock import patch
from django.conf import settings
-from django.test import RequestFactory, TestCase, override_settings
+from django.test import TestCase, override_settings
from django.urls import reverse
from django.utils import timezone
@@ -14,9 +14,7 @@
from lxml import etree
from onelogin.saml2.utils import OneLogin_Saml2_Utils
-from digid_eherkenning.choices import AssuranceLevels
from digid_eherkenning.models import EherkenningConfiguration
-from digid_eherkenning.views import eHerkenningLoginView
from .project.models import User
from .utils import create_example_artifact, get_saml_element
@@ -146,7 +144,8 @@ def setUp(self):
config = EherkenningConfiguration.get_solo()
- with config.certificate.public_certificate.open("r") as cert_file:
+ current_cert, _ = config.select_certificates()
+ with current_cert.public_certificate.open("r") as cert_file:
cert = cert_file.read()
encrypted_attribute = OneLogin_Saml2_Utils.generate_name_id(
diff --git a/tests/test_migrations.py b/tests/test_migrations.py
index b10086a..782bc7d 100644
--- a/tests/test_migrations.py
+++ b/tests/test_migrations.py
@@ -7,11 +7,10 @@
import pytest
from cryptography.hazmat.primitives.asymmetric import rsa
from simple_certmanager.constants import CertificateTypes
-from simple_certmanager.models import Certificate
from simple_certmanager.test.certificate_generation import key_to_pem
from simple_certmanager.utils import load_pem_x509_private_key
-from digid_eherkenning.choices import AssuranceLevels
+from digid_eherkenning.choices import AssuranceLevels, ConfigTypes
@pytest.mark.django_db()
@@ -43,7 +42,7 @@ def test_fixing_misconfigured_eherkenning(migrator):
@pytest.mark.django_db
def test_decrypt_private_keys_with_passphrase(
- migrator, encrypted_keypair: tuple[bytes, bytes]
+ temp_private_root, migrator, encrypted_keypair: tuple[bytes, bytes]
):
old_state = migrator.apply_initial_migration(
("digid_eherkenning", "0008_update_loa_fields")
@@ -163,6 +162,7 @@ def _decryption_skip_cases_idfn(case):
)
@pytest.mark.django_db
def test_decryption_migration_robustness(
+ temp_private_root,
migrator,
leaf_keypair: tuple[rsa.RSAPrivateKey, bytes],
encrypted_keypair: tuple[bytes, bytes],
@@ -202,3 +202,64 @@ def test_decryption_migration_robustness(
)
except Exception:
pytest.fail("Expected migration not to crash")
+
+
+@pytest.mark.parametrize(
+ "model_name,config_type",
+ [
+ ("DigidConfiguration", ConfigTypes.digid),
+ ("EherkenningConfiguration", ConfigTypes.eherkenning),
+ ],
+)
+def test_move_certificate(
+ temp_private_root,
+ migrator,
+ leaf_keypair,
+ model_name,
+ config_type,
+):
+ key, cert_pem = leaf_keypair
+ key_pem = key_to_pem(key, passphrase="")
+ old_state = migrator.apply_initial_migration(
+ (
+ "digid_eherkenning",
+ "0011_configcertificate_configcertificate_uniq_config_cert",
+ )
+ )
+ OldConfig = old_state.apps.get_model("digid_eherkenning", model_name)
+ Certificate = old_state.apps.get_model("simple_certmanager", "Certificate")
+ certificate = Certificate.objects.create(
+ label="Test certificate",
+ type=CertificateTypes.key_pair,
+ public_certificate=File(BytesIO(cert_pem), name="client_cert.pem"),
+ private_key=File(BytesIO(key_pem), name="client_key.pem"),
+ )
+ OldConfig.objects.create(certificate=certificate)
+
+ new_state = migrator.apply_tested_migration(
+ ("digid_eherkenning", "0012_move_config_certificate")
+ )
+
+ ConfigCertificate = new_state.apps.get_model(
+ "digid_eherkenning", "ConfigCertificate"
+ )
+ instance = ConfigCertificate.objects.get(certificate=certificate.pk)
+ assert instance.config_type == config_type
+
+
+def test_move_certificate_noop(migrator):
+ migrator.apply_initial_migration(
+ (
+ "digid_eherkenning",
+ "0011_configcertificate_configcertificate_uniq_config_cert",
+ )
+ )
+ # there are no config records on a fresh install
+ new_state = migrator.apply_tested_migration(
+ ("digid_eherkenning", "0012_move_config_certificate")
+ )
+
+ ConfigCertificate = new_state.apps.get_model(
+ "digid_eherkenning", "ConfigCertificate"
+ )
+ assert not ConfigCertificate.objects.exists()