diff --git a/CHANGELOG.md b/CHANGELOG.md index 6188ede0ac..fb97ec5153 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o - Bumped supported Python versions to `3.10.16` and `3.9.21` [#5840](https://github.com/ethyca/fides/pull/5840) - Update the privacy request detail page to a new layout and improved styling [#5824](https://github.com/ethyca/fides/pull/5824) - Updated privacy request handling to still succeed if not all identities are provided [#5836](https://github.com/ethyca/fides/pull/5836) +- Refactored privacy request processing to never re-use sessions [#5862](https://github.com/ethyca/fides/pull/5862) ### Developer Experience diff --git a/src/fides/api/task/graph_task.py b/src/fides/api/task/graph_task.py index e12979f760..0b05127fc0 100644 --- a/src/fides/api/task/graph_task.py +++ b/src/fides/api/task/graph_task.py @@ -36,7 +36,8 @@ ConnectionType, ) from fides.api.models.datasetconfig import DatasetConfig -from fides.api.models.policy import Policy +from fides.api.models.policy import Policy, Rule +from fides.api.models.privacy_preference import PrivacyPreferenceHistory from fides.api.models.privacy_request import ExecutionLog, PrivacyRequest, RequestTask from fides.api.schemas.policy import ActionType, CurrentStep from fides.api.schemas.privacy_request import ExecutionLogStatus @@ -54,7 +55,9 @@ make_immutable, make_mutable, ) -from fides.api.util.consent_util import add_errored_system_status_for_consent_reporting +from fides.api.util.consent_util import ( + add_errored_system_status_for_consent_reporting_on_preferences, +) from fides.api.util.logger import Pii from fides.api.util.logger_context_utils import LoggerContextKeys from fides.api.util.saas_util import FIDESOPS_GROUPED_INPUTS @@ -138,13 +141,7 @@ def result(*args: Any, **kwargs: Any) -> Any: self.resources.request.id, ) self.log_skipped(action_type, exc) - for pref in self.resources.request.privacy_preferences: - # For consent reporting, also caching the given system as skipped for all historical privacy preferences. - pref.cache_system_status( - self.resources.session, - self.connector.configuration.system_key, - ExecutionLogStatus.skipped, - ) + self.cache_system_status_for_preferences() return default_return except BaseException as ex: # pylint: disable=W0703 traceback.print_exc() @@ -164,11 +161,7 @@ def result(*args: Any, **kwargs: Any) -> Any: action_type.value ] # Convert ActionType into a CurrentStep, no longer coerced with Pydantic V2 ) - add_errored_system_status_for_consent_reporting( - self.resources.session, - self.resources.request, - self.connector.configuration, - ) + self.add_error_status_for_consent_reporting() if not self.request_task.id: # TODO Remove when we stop support for DSR 2.0 # Re-raise to stop privacy request execution on failure for @@ -730,6 +723,48 @@ def consent_request(self, identity: Dict[str, Any]) -> bool: self.log_end(ActionType.consent) return output + def cache_system_status_for_preferences(self) -> None: + """ + Calls cache_system_status for all historical privacy preferences for the given request. + + Purposely uses a new session. + """ + + privacy_request_id = self.resources.request.id + + with get_db() as db: + + privacy_preferences = db.query(PrivacyPreferenceHistory).filter( + PrivacyPreferenceHistory.privacy_request_id == privacy_request_id + ) + for pref in privacy_preferences: + # For consent reporting, also caching the given system as skipped for all historical privacy preferences. + pref.cache_system_status( + db, + self.connector.configuration.system_key, # type: ignore[arg-type] + ExecutionLogStatus.skipped, + ) + + def add_error_status_for_consent_reporting(self) -> None: + """ + Adds the errored system status for all historical privacy preferences for the given request that + are deemed relevant for the connector failure (i.e if they had a "pending" log added to them). + + Purposely uses a new session. + """ + privacy_request_id = self.resources.request.id + with get_db() as db: + privacy_preferences = ( + db.query(PrivacyPreferenceHistory) + .filter( + PrivacyPreferenceHistory.privacy_request_id == privacy_request_id + ) + .all() + ) + add_errored_system_status_for_consent_reporting_on_preferences( + db, privacy_preferences, self.connector.configuration + ) + def collect_queries( traversal: Traversal, resources: TaskResources @@ -816,39 +851,45 @@ def build_affected_field_logs( }] """ - targeted_field_paths: Dict[FieldAddress, str] = {} + policy_id = policy.id - for rule in policy.rules: # type: ignore[attr-defined] - if rule.action_type != action_type: - continue - rule_categories: List[str] = rule.get_target_data_categories() - if not rule_categories: - continue + with get_db() as db: - collection_categories: Dict[ - str, List[FieldPath] - ] = node.collection.field_paths_by_category # type: ignore - for rule_cat in rule_categories: - for collection_cat, field_paths in collection_categories.items(): - if collection_cat.startswith(rule_cat): - targeted_field_paths.update( - { - node.address.field_address(field_path): collection_cat - for field_path in field_paths - } - ) + rules = db.query(Rule).filter(Rule.policy_id == policy_id) - ret: List[Dict[str, Any]] = [] - for field_address, data_categories in targeted_field_paths.items(): - ret.append( - { - "path": field_address.value, - "field_name": field_address.field_path.string_path, - "data_categories": [data_categories], - } - ) + targeted_field_paths: Dict[FieldAddress, str] = {} + + for rule in rules: # type: ignore[attr-defined] + if rule.action_type != action_type: + continue + rule_categories: List[str] = rule.get_target_data_categories() + if not rule_categories: + continue + + collection_categories: Dict[ + str, List[FieldPath] + ] = node.collection.field_paths_by_category # type: ignore + for rule_cat in rule_categories: + for collection_cat, field_paths in collection_categories.items(): + if collection_cat.startswith(rule_cat): + targeted_field_paths.update( + { + node.address.field_address(field_path): collection_cat + for field_path in field_paths + } + ) + + ret: List[Dict[str, Any]] = [] + for field_address, data_categories in targeted_field_paths.items(): + ret.append( + { + "path": field_address.value, + "field_name": field_address.field_path.string_path, + "data_categories": [data_categories], + } + ) - return ret + return ret def build_consent_dataset_graph(datasets: List[DatasetConfig]) -> DatasetGraph: diff --git a/src/fides/api/util/consent_util.py b/src/fides/api/util/consent_util.py index 072313aff5..b5a174abc3 100644 --- a/src/fides/api/util/consent_util.py +++ b/src/fides/api/util/consent_util.py @@ -214,15 +214,30 @@ def add_errored_system_status_for_consent_reporting( Deeming them relevant if they already had a "pending" log added to them. """ - for pref in privacy_request.privacy_preferences: # type: ignore[attr-defined] + add_errored_system_status_for_consent_reporting_on_preferences(db, privacy_request.privacy_preferences, connection_config) # type: ignore[attr-defined] + + +def add_errored_system_status_for_consent_reporting_on_preferences( + db: Session, + privacy_preferences: List[PrivacyPreferenceHistory], + connection_config: ConnectionConfig, +) -> None: + """ + Cache an errored system status for consent reporting on just the subset + of preferences that were deemed relevant for the connector on failure, + from the provided list of preferences. + + Deeming them relevant if they already had a "pending" log added to them. + """ + for preference in privacy_preferences: if ( - pref.affected_system_status - and pref.affected_system_status.get(connection_config.system_key) + preference.affected_system_status + and preference.affected_system_status.get(connection_config.system_key) == ExecutionLogStatus.pending.value ): - pref.cache_system_status( + preference.cache_system_status( db, - connection_config.system_key, + connection_config.system_key, # type: ignore[arg-type] ExecutionLogStatus.error, )