diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5011b3ebb2b35..ace9650f50f28 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -57,6 +57,7 @@ import re2 import sqlalchemy_jsonfield from dateutil.relativedelta import relativedelta +from packaging import version as packaging_version from sqlalchemy import ( Boolean, Column, @@ -116,6 +117,7 @@ clear_task_instances, ) from airflow.models.tasklog import LogTemplate +from airflow.providers.fab import __version__ as FAB_VERSION from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.security import permissions from airflow.settings import json @@ -940,16 +942,26 @@ def update_old_perm(permission: str): updated_access_control = {} for role, perms in access_control.items(): - updated_access_control[role] = updated_access_control.get(role, {}) - if isinstance(perms, (set, list)): - # Support for old-style access_control where only the actions are specified - updated_access_control[role][permissions.RESOURCE_DAG] = set(perms) + if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"): + updated_access_control[role] = updated_access_control.get(role, {}) + if isinstance(perms, (set, list)): + # Support for old-style access_control where only the actions are specified + updated_access_control[role][permissions.RESOURCE_DAG] = set(perms) + else: + updated_access_control[role] = perms + if permissions.RESOURCE_DAG in updated_access_control[role]: + updated_access_control[role][permissions.RESOURCE_DAG] = { + update_old_perm(perm) + for perm in updated_access_control[role][permissions.RESOURCE_DAG] + } + elif isinstance(perms, dict): + # Not allow new access control format with old FAB versions + raise AirflowException( + "Please upgrade the FAB provider to a version >= 1.3.0 to allow " + "use the Dag Level Access Control new format." + ) else: - updated_access_control[role] = perms - if permissions.RESOURCE_DAG in updated_access_control[role]: - updated_access_control[role][permissions.RESOURCE_DAG] = { - update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG] - } + updated_access_control[role] = {update_old_perm(perm) for perm in perms} return updated_access_control diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 76d72a4585009..8ccdef2c6390c 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -27,7 +27,6 @@ import os import sys import types -from cgitb import Hook from pathlib import Path from typing import TYPE_CHECKING, Any, Iterable @@ -78,7 +77,7 @@ registered_operator_link_classes: dict[str, type] | None = None registered_ti_dep_classes: dict[str, type] | None = None timetable_classes: dict[str, type[Timetable]] | None = None -hook_lineage_reader_classes: list[type[Hook]] | None = None +hook_lineage_reader_classes: list[type[HookLineageReader]] | None = None priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None """ Mapping of class names to class of OperatorLinks registered by plugins. diff --git a/airflow/www/yarn.lock b/airflow/www/yarn.lock index 20d9ce752095f..9995fa5213a6f 100644 --- a/airflow/www/yarn.lock +++ b/airflow/www/yarn.lock @@ -4477,7 +4477,7 @@ brace-expansion@^2.0.1: dependencies: balanced-match "^1.0.0" -braces@^3.0.2, braces@^3.0.3: +braces@^3.0.3: version "3.0.3" resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.3.tgz#490332f40919452272d55a8480adc0c441358789" integrity sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA== @@ -8853,11 +8853,11 @@ micromark@^3.0.0: uvu "^0.5.0" micromatch@^4.0.4, micromatch@^4.0.5: - version "4.0.5" - resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.5.tgz#bc8999a7cbbf77cdc89f132f6e467051b49090c6" - integrity sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA== + version "4.0.8" + resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.8.tgz#d66fa18f3a47076789320b9b1af32bd86d9fa202" + integrity sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA== dependencies: - braces "^3.0.2" + braces "^3.0.3" picomatch "^2.3.1" microseconds@0.2.0: diff --git a/docs/apache-airflow-providers-fab/auth-manager/webserver-authentication.rst b/docs/apache-airflow-providers-fab/auth-manager/webserver-authentication.rst index feabad33a806f..48c8c8f1b1f25 100644 --- a/docs/apache-airflow-providers-fab/auth-manager/webserver-authentication.rst +++ b/docs/apache-airflow-providers-fab/auth-manager/webserver-authentication.rst @@ -229,3 +229,94 @@ webserver_config.py itself if you wish. roles = map_roles(teams) log.debug(f"User info from Github: {user_data}\nTeam info from Github: {teams}") return {"username": "github_" + user_data.get("login"), "role_keys": roles} + +Example using team based Authorization with KeyCloak +'''''''''''''''''''''''''''''''''''''''''''''''''''''''' +Here is an example of what you might have in your webserver_config.py: + +.. code-block:: python + + import os + import jwt + import requests + import logging + from base64 import b64decode + from cryptography.hazmat.primitives import serialization + from flask_appbuilder.security.manager import AUTH_DB, AUTH_OAUTH + from airflow import configuration as conf + from airflow.www.security import AirflowSecurityManager + + log = logging.getLogger(__name__) + + AUTH_TYPE = AUTH_OAUTH + AUTH_USER_REGISTRATION = True + AUTH_ROLES_SYNC_AT_LOGIN = True + AUTH_USER_REGISTRATION_ROLE = "Viewer" + OIDC_ISSUER = "https://sso.keycloak.me/realms/airflow" + + # Make sure you create these role on Keycloak + AUTH_ROLES_MAPPING = { + "Viewer": ["Viewer"], + "Admin": ["Admin"], + "User": ["User"], + "Public": ["Public"], + "Op": ["Op"], + } + + OAUTH_PROVIDERS = [ + { + "name": "keycloak", + "icon": "fa-key", + "token_key": "access_token", + "remote_app": { + "client_id": "airflow", + "client_secret": "xxx", + "server_metadata_url": "https://sso.keycloak.me/realms/airflow/.well-known/openid-configuration", + "api_base_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect", + "client_kwargs": {"scope": "email profile"}, + "access_token_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/token", + "authorize_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/auth", + "request_token_url": None, + }, + } + ] + + # Fetch public key + req = requests.get(OIDC_ISSUER) + key_der_base64 = req.json()["public_key"] + key_der = b64decode(key_der_base64.encode()) + public_key = serialization.load_der_public_key(key_der) + + + class CustomSecurityManager(AirflowSecurityManager): + def oauth_user_info(self, provider, response): + if provider == "keycloak": + token = response["access_token"] + me = jwt.decode(token, public_key, algorithms=["HS256", "RS256"]) + + # Extract roles from resource access + realm_access = me.get("realm_access", {}) + groups = realm_access.get("roles", []) + + log.info("groups: {0}".format(groups)) + + if not groups: + groups = ["Viewer"] + + userinfo = { + "username": me.get("preferred_username"), + "email": me.get("email"), + "first_name": me.get("given_name"), + "last_name": me.get("family_name"), + "role_keys": groups, + } + + log.info("user info: {0}".format(userinfo)) + + return userinfo + else: + return {} + + + # Make sure to replace this with your own implementation of AirflowSecurityManager class + SECURITY_MANAGER_CLASS = CustomSecurityManager diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index d2b23c02654f3..831990fd68b1e 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2789,6 +2789,63 @@ def test_replace_outdated_access_control_actions(self): assert "permission is deprecated" in str(deprecation_warnings[0].message) assert "permission is deprecated" in str(deprecation_warnings[1].message) + @pytest.mark.parametrize( + "fab_version, perms, expected_exception, expected_perms", + [ + pytest.param( + "1.2.0", + { + "role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, + "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, + # will raise error in old FAB with new access control format + }, + AirflowException, + None, + id="old_fab_new_access_control_format", + ), + pytest.param( + "1.2.0", + { + "role1": [ + permissions.ACTION_CAN_READ, + permissions.ACTION_CAN_EDIT, + permissions.ACTION_CAN_READ, + ], + }, + None, + {"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}}, + id="old_fab_old_access_control_format", + ), + pytest.param( + "1.3.0", + { + "role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, # old format + "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, # new format + }, + None, + { + "role1": { + permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT} + }, + "role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, + }, + id="new_fab_mixed_access_control_format", + ), + ], + ) + def test_access_control_format(self, fab_version, perms, expected_exception, expected_perms): + if expected_exception: + with patch("airflow.models.dag.FAB_VERSION", fab_version): + with pytest.raises( + expected_exception, + match="Please upgrade the FAB provider to a version >= 1.3.0 to allow use the Dag Level Access Control new format.", + ): + DAG(dag_id="dag_test", schedule=None, access_control=perms) + else: + with patch("airflow.models.dag.FAB_VERSION", fab_version): + dag = DAG(dag_id="dag_test", schedule=None, access_control=perms) + assert dag.access_control == expected_perms + def test_validate_executor_field_executor_not_configured(self): dag = DAG("test-dag", schedule=None) EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor") diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 277923423e2c6..d7f09c20ff9d3 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -40,6 +40,7 @@ import pytest from dateutil.relativedelta import FR, relativedelta from kubernetes.client import models as k8s +from packaging import version as packaging_version import airflow from airflow.datasets import Dataset @@ -58,6 +59,7 @@ from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator +from airflow.providers.fab import __version__ as FAB_VERSION from airflow.security import permissions from airflow.sensors.bash import BashSensor from airflow.serialization.dag_dependency import DagDependency @@ -246,6 +248,11 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i } }, } + if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0") + else { + "__type": "set", + "__var": [permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT], + } }, }, "edge_info": {},