diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index bfdbb2d7fc885..94ec17ad63d9d 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -21,6 +21,8 @@ from pydantic import BaseModel, Field, field_validator +from airflow.utils.log.secrets_masker import redact + class DagScheduleAssetReference(BaseModel): """DAG schedule reference serializer for assets.""" @@ -58,6 +60,11 @@ class AssetResponse(BaseModel): producing_tasks: list[TaskOutletAssetReference] aliases: list[AssetAliasSchema] + @field_validator("extra", mode="after") + @classmethod + def redact_extra(cls, v: dict): + return redact(v) + class AssetCollectionResponse(BaseModel): """Asset collection response.""" @@ -93,6 +100,11 @@ class AssetEventResponse(BaseModel): created_dagruns: list[DagRunAssetReference] timestamp: datetime + @field_validator("extra", mode="after") + @classmethod + def redact_extra(cls, v: dict): + return redact(v) + class AssetEventCollectionResponse(BaseModel): """Asset event collection response.""" diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 4a3e21035c34f..9f494f53c80c6 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -59,6 +59,22 @@ def _create_assets(session, num: int = 2) -> None: session.commit() +def _create_assets_with_sensitive_extra(session, num: int = 2) -> None: + default_time = "2020-06-11T18:00:00+00:00" + assets = [ + AssetModel( + id=i, + uri=f"s3://bucket/key/{i}", + extra={"password": "bar"}, + created_at=timezone.parse(default_time), + updated_at=timezone.parse(default_time), + ) + for i in range(1, 1 + num) + ] + session.add_all(assets) + session.commit() + + def _create_provided_asset(session, asset: AssetModel) -> None: session.add(asset) session.commit() @@ -82,6 +98,24 @@ def _create_assets_events(session, num: int = 2) -> None: session.commit() +def _create_assets_events_with_sensitive_extra(session, num: int = 2) -> None: + default_time = "2020-06-11T18:00:00+00:00" + assets_events = [ + AssetEvent( + id=i, + asset_id=i, + extra={"password": "bar"}, + source_task_id="source_task_id", + source_dag_id="source_dag_id", + source_run_id=f"source_run_id_{i}", + timestamp=timezone.parse(default_time), + ) + for i in range(1, 1 + num) + ] + session.add_all(assets_events) + session.commit() + + def _create_provided_asset_event(session, asset_event: AssetEvent) -> None: session.add(asset_event) session.commit() @@ -142,6 +176,10 @@ def teardown_method(self) -> None: def create_assets(self, session, num: int = 2): _create_assets(session=session, num=num) + @provide_session + def create_assets_with_sensitive_extra(self, session, num: int = 2): + _create_assets_with_sensitive_extra(session=session, num=num) + @provide_session def create_provided_asset(self, session, asset: AssetModel): _create_provided_asset(session=session, asset=asset) @@ -150,6 +188,10 @@ def create_provided_asset(self, session, asset: AssetModel): def create_assets_events(self, session, num: int = 2): _create_assets_events(session=session, num=num) + @provide_session + def create_assets_events_with_sensitive_extra(self, session, num: int = 2): + _create_assets_events_with_sensitive_extra(session=session, num=num) + @provide_session def create_provided_asset_event(self, session, asset_event: AssetEvent): _create_provided_asset_event(session=session, asset_event=asset_event) @@ -439,6 +481,68 @@ def test_limit_and_offset(self, test_client, params, expected_asset_uris): asset_uris = [asset["uri"] for asset in response.json()["asset_events"]] assert asset_uris == expected_asset_uris + @pytest.mark.usefixtures("time_freezer") + @pytest.mark.enable_redact + def test_should_mask_sensitive_extra(self, test_client, session): + self.create_assets_with_sensitive_extra() + self.create_assets_events_with_sensitive_extra() + self.create_dag_run() + self.create_asset_dag_run() + response = test_client.get("/public/assets/events") + assert response.status_code == 200 + response_data = response.json() + assert response_data == { + "asset_events": [ + { + "id": 1, + "asset_id": 1, + "uri": "s3://bucket/key/1", + "extra": {"password": "***"}, + "source_task_id": "source_task_id", + "source_dag_id": "source_dag_id", + "source_run_id": "source_run_id_1", + "source_map_index": -1, + "created_dagruns": [ + { + "run_id": "source_run_id_1", + "dag_id": "source_dag_id", + "logical_date": "2020-06-11T18:00:00Z", + "start_date": "2020-06-11T18:00:00Z", + "end_date": "2020-06-11T18:00:00Z", + "state": "success", + "data_interval_start": "2020-06-11T18:00:00Z", + "data_interval_end": "2020-06-11T18:00:00Z", + } + ], + "timestamp": "2020-06-11T18:00:00Z", + }, + { + "id": 2, + "asset_id": 2, + "uri": "s3://bucket/key/2", + "extra": {"password": "***"}, + "source_task_id": "source_task_id", + "source_dag_id": "source_dag_id", + "source_run_id": "source_run_id_2", + "source_map_index": -1, + "created_dagruns": [ + { + "run_id": "source_run_id_2", + "dag_id": "source_dag_id", + "logical_date": "2020-06-11T18:00:00Z", + "start_date": "2020-06-11T18:00:00Z", + "end_date": "2020-06-11T18:00:00Z", + "state": "success", + "data_interval_start": "2020-06-11T18:00:00Z", + "data_interval_end": "2020-06-11T18:00:00Z", + } + ], + "timestamp": "2020-06-11T18:00:00Z", + }, + ], + "total_entries": 2, + } + class TestGetAssetEndpoint(TestAssets): @pytest.mark.parametrize( @@ -478,6 +582,27 @@ def test_should_respond_404(self, test_client): assert response.status_code == 404 assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + @pytest.mark.usefixtures("time_freezer") + @pytest.mark.enable_redact + def test_should_mask_sensitive_extra(self, test_client, session): + self.create_assets_with_sensitive_extra() + tz_datetime_format = self.default_time.replace("+00:00", "Z") + uri = "s3://bucket/key/1" + response = test_client.get( + f"/public/assets/{uri}", + ) + assert response.status_code == 200 + assert response.json() == { + "id": 1, + "uri": "s3://bucket/key/1", + "extra": {"password": "***"}, + "created_at": tz_datetime_format, + "updated_at": tz_datetime_format, + "consuming_dags": [], + "producing_tasks": [], + "aliases": [], + } + class TestQueuedEventEndpoint(TestAssets): def _create_asset_dag_run_queues(self, dag_id, asset_id, session): @@ -593,3 +718,23 @@ def test_invalid_attr_not_allowed(self, test_client, session): response = test_client.post("/public/assets/events", json=event_invalid_payload) assert response.status_code == 422 + + @pytest.mark.usefixtures("time_freezer") + @pytest.mark.enable_redact + def test_should_mask_sensitive_extra(self, test_client, session): + self.create_assets() + event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}} + response = test_client.post("/public/assets/events", json=event_payload) + assert response.status_code == 200 + assert response.json() == { + "id": mock.ANY, + "asset_id": 1, + "uri": "s3://bucket/key/1", + "extra": {"password": "***", "from_rest_api": True}, + "source_task_id": None, + "source_dag_id": None, + "source_run_id": None, + "source_map_index": -1, + "created_dagruns": [], + "timestamp": self.default_time.replace("+00:00", "Z"), + }