Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: app entitlements #12090

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/docker-push-variables/push_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# Decide if we should push the image or not
should_push = True
if len(os.environ.get("DOCKER_USERNAME", "")) > 0:
if len(os.environ.get("DOCKER_USERNAME", "")) < 1:
# Don't push if we don't have DOCKER_USERNAME, i.e. no secrets are available
should_push = False
if os.environ.get("GITHUB_REPOSITORY").lower() == "goauthentik/authentik-internal":
Expand Down
54 changes: 54 additions & 0 deletions authentik/core/api/application_entitlements.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Application Roles API Viewset"""

from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.viewsets import ModelViewSet

from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer
from authentik.core.models import (
Application,
ApplicationEntitlement,
User,
)


class ApplicationEntitlementSerializer(ModelSerializer):
"""ApplicationEntitlement Serializer"""

def validate_app(self, app: Application) -> Application:
"""Ensure user has permission to view"""
user: User = self._context["request"].user
if user.has_perm("view_application", app) or user.has_perm(
"authentik_core.view_application"
):
return app
raise ValidationError(_("User does not have access to application."), code="invalid")

class Meta:
model = ApplicationEntitlement
fields = [
"pbm_uuid",
"name",
"app",
"attributes",
]


class ApplicationEntitlementViewSet(UsedByMixin, ModelViewSet):
"""ApplicationEntitlement Viewset"""

queryset = ApplicationEntitlement.objects.all()
serializer_class = ApplicationEntitlementSerializer
search_fields = [
"pbm_uuid",
"name",
"app",
"attributes",
]
filterset_fields = [
"pbm_uuid",
"name",
"app",
]
ordering = ["name"]
45 changes: 45 additions & 0 deletions authentik/core/migrations/0041_applicationentitlement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Generated by Django 5.0.9 on 2024-11-20 15:16

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("authentik_core", "0040_provider_invalidation_flow"),
("authentik_policies", "0011_policybinding_failure_result_and_more"),
]

operations = [
migrations.CreateModel(
name="ApplicationEntitlement",
fields=[
(
"policybindingmodel_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="authentik_policies.policybindingmodel",
),
),
("attributes", models.JSONField(blank=True, default=dict)),
("name", models.TextField()),
(
"app",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to="authentik_core.application"
),
),
],
options={
"verbose_name": "Application Entitlement",
"verbose_name_plural": "Application Entitlements",
"unique_together": {("app", "name")},
},
bases=("authentik_policies.policybindingmodel", models.Model),
),
]
41 changes: 41 additions & 0 deletions authentik/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,25 @@
always_merger.merge(final_attributes, self.attributes)
return final_attributes

def app_entitlements(self, app: "Application | None") -> QuerySet["ApplicationEntitlement"]:
"""Get all entitlements this user has for `app`."""
if not app:
return []

Check warning on line 320 in authentik/core/models.py

View check run for this annotation

Codecov / codecov/patch

authentik/core/models.py#L320

Added line #L320 was not covered by tests
all_groups = self.all_groups()
qs = app.applicationentitlement_set.filter(
Q(
Q(bindings__user=self) | Q(bindings__group__in=all_groups),
bindings__negate=False,
)
| Q(
Q(~Q(bindings__user=self), bindings__user__isnull=False)
| Q(~Q(bindings__group__in=all_groups), bindings__group__isnull=False),
bindings__negate=True,
),
bindings__enabled=True,
)
return qs

@property
def serializer(self) -> Serializer:
from authentik.core.api.users import UserSerializer
Expand Down Expand Up @@ -581,6 +600,28 @@
verbose_name_plural = _("Applications")


class ApplicationEntitlement(AttributesMixin, SerializerModel, PolicyBindingModel):
"""Application-scoped entitlement to control authorization in an application"""

name = models.TextField()

app = models.ForeignKey(Application, on_delete=models.CASCADE)

class Meta:
verbose_name = _("Application Entitlement")
verbose_name_plural = _("Application Entitlements")
unique_together = (("app", "name"),)

def __str__(self):
return f"Application Entitlement {self.name} for app {self.app_id}"

@property
def serializer(self) -> type[Serializer]:
from authentik.core.api.application_entitlements import ApplicationEntitlementSerializer

return ApplicationEntitlementSerializer


class SourceUserMatchingModes(models.TextChoices):
"""Different modes a source can handle new/returning users"""

Expand Down
116 changes: 116 additions & 0 deletions authentik/core/tests/test_application_entitlements.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Test Application Entitlements API"""

from django.urls import reverse
from guardian.shortcuts import assign_perm
from rest_framework.test import APITestCase

from authentik.core.models import Application, ApplicationEntitlement, Group
from authentik.core.tests.utils import create_test_flow, create_test_user
from authentik.lib.generators import generate_id
from authentik.policies.models import PolicyBinding
from authentik.providers.oauth2.models import OAuth2Provider


class TestApplicationEntitlements(APITestCase):
"""Test application entitlements"""

def setUp(self) -> None:
self.user = create_test_user()
self.other_user = create_test_user()
self.provider = OAuth2Provider.objects.create(
name="test",
authorization_flow=create_test_flow(),
)
self.app: Application = Application.objects.create(
name=generate_id(),
slug=generate_id(),
provider=self.provider,
)

def test_user(self):
"""Test user-direct assignment"""
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, user=self.user, order=0)
ents = self.user.app_entitlements(self.app)
self.assertEqual(len(ents), 1)
self.assertEqual(ents[0].name, ent.name)

def test_group(self):
"""Test direct group"""
group = Group.objects.create(name=generate_id())
self.user.ak_groups.add(group)
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, group=group, order=0)
ents = self.user.app_entitlements(self.app)
self.assertEqual(len(ents), 1)
self.assertEqual(ents[0].name, ent.name)

def test_group_indirect(self):
"""Test indirect group"""
parent = Group.objects.create(name=generate_id())
group = Group.objects.create(name=generate_id(), parent=parent)
self.user.ak_groups.add(group)
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, group=parent, order=0)
ents = self.user.app_entitlements(self.app)
self.assertEqual(len(ents), 1)
self.assertEqual(ents[0].name, ent.name)

def test_negate_user(self):
"""Test with negate flag"""
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, user=self.other_user, order=0, negate=True)
ents = self.user.app_entitlements(self.app)
self.assertEqual(len(ents), 1)
self.assertEqual(ents[0].name, ent.name)

def test_negate_group(self):
"""Test with negate flag"""
other_group = Group.objects.create(name=generate_id())
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, group=other_group, order=0, negate=True)
ents = self.user.app_entitlements(self.app)
self.assertEqual(len(ents), 1)
self.assertEqual(ents[0].name, ent.name)

def test_api_perms_global(self):
"""Test API creation with global permissions"""
assign_perm("authentik_core.add_applicationentitlement", self.user)
assign_perm("authentik_core.view_application", self.user)
self.client.force_login(self.user)
res = self.client.post(
reverse("authentik_api:applicationentitlement-list"),
data={
"name": generate_id(),
"app": self.app.pk,
},
)
self.assertEqual(res.status_code, 201)

def test_api_perms_scoped(self):
"""Test API creation with scoped permissions"""
assign_perm("authentik_core.add_applicationentitlement", self.user)
assign_perm("authentik_core.view_application", self.user, self.app)
self.client.force_login(self.user)
res = self.client.post(
reverse("authentik_api:applicationentitlement-list"),
data={
"name": generate_id(),
"app": self.app.pk,
},
)
self.assertEqual(res.status_code, 201)

def test_api_perms_missing(self):
"""Test API creation with no permissions"""
assign_perm("authentik_core.add_applicationentitlement", self.user)
self.client.force_login(self.user)
res = self.client.post(
reverse("authentik_api:applicationentitlement-list"),
data={
"name": generate_id(),
"app": self.app.pk,
},
)
self.assertEqual(res.status_code, 400)
self.assertJSONEqual(res.content, {"app": ["User does not have access to application."]})
2 changes: 2 additions & 0 deletions authentik/core/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.contrib.auth.decorators import login_required
from django.urls import path

from authentik.core.api.application_entitlements import ApplicationEntitlementViewSet
from authentik.core.api.applications import ApplicationViewSet
from authentik.core.api.authenticated_sessions import AuthenticatedSessionViewSet
from authentik.core.api.devices import AdminDeviceViewSet, DeviceViewSet
Expand Down Expand Up @@ -69,6 +70,7 @@
api_urlpatterns = [
("core/authenticated_sessions", AuthenticatedSessionViewSet),
("core/applications", ApplicationViewSet),
("core/application_entitlements", ApplicationEntitlementViewSet),
path(
"core/transactional/applications/",
TransactionalApplicationView.as_view(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Generated by Django 4.2.5 on 2023-09-13 18:07
import authentik.lib.models
import django.db.models.deletion

from django.db import migrations, models

Expand All @@ -23,4 +25,13 @@ class Migration(migrations.Migration):
default=30, help_text="Timeout after which Policy execution is terminated."
),
),
migrations.AlterField(
model_name="policybinding",
name="target",
field=authentik.lib.models.InheritanceForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="bindings",
to="authentik_policies.policybindingmodel",
),
),
]
4 changes: 3 additions & 1 deletion authentik/policies/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ class PolicyBinding(SerializerModel):
blank=True,
)

target = InheritanceForeignKey(PolicyBindingModel, on_delete=models.CASCADE, related_name="+")
target = InheritanceForeignKey(
PolicyBindingModel, on_delete=models.CASCADE, related_name="bindings"
)
negate = models.BooleanField(
default=False,
help_text=_("Negates the outcome of the policy. Messages are unaffected."),
Expand Down
4 changes: 4 additions & 0 deletions authentik/providers/oauth2/tests/test_userinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def test_userinfo_normal(self):
"nickname": self.user.name,
"groups": [group.name for group in self.user.ak_groups.all()],
"sub": "bar",
"roles": [],
"entitlements": [],
},
)
self.assertEqual(res.status_code, 200)
Expand All @@ -90,6 +92,8 @@ def test_userinfo_invalid_scope(self):
"nickname": self.user.name,
"groups": [group.name for group in self.user.ak_groups.all()],
"sub": "bar",
"roles": [],
"entitlements": [],
},
)
self.assertEqual(res.status_code, 200)
Expand Down
1 change: 1 addition & 0 deletions authentik/providers/proxy/controllers/k8s/traefik_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def get_reference_object(self) -> TraefikMiddleware:
authResponseHeaders=[
"X-authentik-username",
"X-authentik-groups",
"X-authentik-entitlements",
"X-authentik-email",
"X-authentik-name",
"X-authentik-uid",
Expand Down
Loading
Loading